Skip to content

Commit

Permalink
fix: Throw error in sync-inputs if total input size is too large (#290)
Browse files Browse the repository at this point in the history
Signed-off-by: Caden Marofke <[email protected]>
  • Loading branch information
marofke authored Apr 9, 2024
1 parent d8810f6 commit 4d40b8c
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 2 deletions.
20 changes: 19 additions & 1 deletion src/deadline/job_attachments/asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
""" Module for File Attachment synching """
from __future__ import annotations
import os
import shutil
import sys
import time
from io import BytesIO
Expand Down Expand Up @@ -337,6 +338,19 @@ def _record_attachment_mtimes(
abs_path = str(Path(local_root) / manifest_path.path)
self.synced_assets_mtime[abs_path] = Path(abs_path).stat().st_mtime_ns

def _ensure_disk_capacity(self, session_dir: Path, total_input_bytes: int) -> None:
"""
Raises an AssetSyncError if the given input bytes is larger than the available disk space.
"""
disk_free: int = shutil.disk_usage(session_dir).free
if total_input_bytes > disk_free:
input_size_readable = _human_readable_file_size(total_input_bytes)
disk_free_readable = _human_readable_file_size(disk_free)
raise AssetSyncError(
"Error occurred while attempting to sync input files: "
f"Total file size required for download ({input_size_readable}) is larger than available disk space ({disk_free_readable})"
)

def sync_inputs(
self,
s3_settings: Optional[JobAttachmentS3Settings],
Expand Down Expand Up @@ -447,14 +461,16 @@ def sync_inputs(

# Merge the manifests in each root into a single manifest
merged_manifests_by_root: dict[str, BaseAssetManifest] = dict()
total_input_size: int = 0
for root, manifests in grouped_manifests_by_root.items():
merged_manifest = merge_asset_manifests(manifests)

if merged_manifest:
merged_manifests_by_root[root] = merged_manifest
total_input_size += merged_manifest.totalSize # type: ignore[attr-defined]

# Download

# Virtual Download Flow
if (
attachments.fileSystem == JobAttachmentsFileSystem.VIRTUAL.value
and sys.platform != "win32"
Expand Down Expand Up @@ -482,6 +498,8 @@ def sync_inputs(
f"Virtual File System not found, falling back to {JobAttachmentsFileSystem.COPIED} for JobAttachmentsFileSystem."
)

# Copied Download flow
self._ensure_disk_capacity(session_dir, total_input_size)
try:
download_summary_statistics = download_files_from_manifests(
s3_bucket=s3_settings.s3BucketName,
Expand Down
23 changes: 23 additions & 0 deletions test/unit/deadline_job_attachments/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,29 @@ def fixture_merged_manifest():
}


@pytest.fixture(name="really_big_manifest")
def fixture_really_big_manifest():
return {
"hashAlg": "xxh128",
"manifestVersion": "2023-03-03",
"paths": [
{
"hash": "a20ddfc33590cd7d2391f1972f66a72a",
"mtime": 4444444444444444,
"path": "a.txt",
"size": 100000000000000000, # 100 Petabytes
},
{
"hash": "b96ddfc33590cd7d2391f1972f66a72a",
"mtime": 2222222222222222,
"path": "b.txt",
"size": 200000000000000000, # 200 Petabytes
},
],
"totalSize": 300000000000000000,
}


def has_posix_target_user() -> bool:
"""Returns if the testing environment exported the env variables for doing
cross-account posix target-user tests.
Expand Down
95 changes: 94 additions & 1 deletion test/unit/deadline_job_attachments/test_asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from deadline.job_attachments.os_file_permission import PosixFileSystemPermissionSettings

from deadline.job_attachments.exceptions import (
AssetSyncError,
VFSExecutableMissingError,
JobAttachmentsS3ClientError,
VFSOSUserNotSetError,
Expand Down Expand Up @@ -276,6 +277,7 @@ def test_sync_inputs_404_error(
default_queue: Queue,
job_fixture_name: str,
s3_settings_fixture_name: str,
test_manifest_one: dict,
request: pytest.FixtureRequest,
):
"""Asserts that a specific error message is raised when getting 404 errors synching inputs"""
Expand All @@ -288,6 +290,7 @@ def test_sync_inputs_404_error(
message="File not found",
)
job: Job = request.getfixturevalue(job_fixture_name)
test_manifest = decode_manifest(json.dumps(test_manifest_one))
s3_settings: JobAttachmentS3Settings = request.getfixturevalue(s3_settings_fixture_name)
default_queue.jobAttachmentSettings = s3_settings
dest_dir = "assetroot-27bggh78dd2b568ab123"
Expand All @@ -296,7 +299,7 @@ def test_sync_inputs_404_error(
# WHEN
with patch(
f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3",
return_value="test_manifest_data",
return_value=test_manifest,
), patch(
f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name",
side_effect=[dest_dir],
Expand Down Expand Up @@ -412,6 +415,15 @@ def test_sync_inputs_with_step_dependencies_same_root_vfs_on_posix(
session_dir = str(tmp_path)
dest_dir = "assetroot-27bggh78dd2b568ab123"
local_root = str(Path(session_dir) / dest_dir)
test_fs_permission_settings: PosixFileSystemPermissionSettings = (
PosixFileSystemPermissionSettings(
os_user="test-user",
os_group="test-group",
dir_mode=0o20,
file_mode=0o20,
)
)
os_env_vars: Dict[str, str] = {"AWS_PROFILE": "test-profile"}
assert job.attachments

test_manifest = decode_manifest(json.dumps(test_manifest_two))
Expand All @@ -432,10 +444,16 @@ def test_sync_inputs_with_step_dependencies_same_root_vfs_on_posix(
), patch(
f"{deadline.__package__}.job_attachments.asset_sync.merge_asset_manifests",
) as merge_manifests_mock, patch(
f"{deadline.__package__}.job_attachments.asset_sync.AssetSync._ensure_disk_capacity",
) as disk_capacity_mock, patch(
f"{deadline.__package__}.job_attachments.download._write_manifest_to_temp_file",
return_value="tmp_manifest",
), patch(
"sys.platform", "linux"
), patch(
f"{deadline.__package__}.job_attachments.asset_sync.mount_vfs_from_manifests"
), patch(
f"{deadline.__package__}.job_attachments.asset_sync.VFSProcessManager.find_vfs"
):
mock_on_downloading_files = MagicMock(return_value=True)

Expand All @@ -447,10 +465,13 @@ def test_sync_inputs_with_step_dependencies_same_root_vfs_on_posix(
tmp_path,
step_dependencies=["step-1"],
on_downloading_files=mock_on_downloading_files,
fs_permission_settings=test_fs_permission_settings,
os_env_vars=os_env_vars,
)

# THEN
merge_manifests_mock.assert_called()
disk_capacity_mock.assert_not_called()
expected_source_path_format = (
"windows"
if job.attachments.manifests[0].rootPathFormat == PathFormat.WINDOWS
Expand All @@ -465,6 +486,78 @@ def test_sync_inputs_with_step_dependencies_same_root_vfs_on_posix(
},
]

@pytest.mark.parametrize(
("job_fixture_name"),
[
("default_job"),
],
)
@pytest.mark.parametrize(
("s3_settings_fixture_name"),
[
("default_job_attachment_s3_settings"),
],
)
def test_sync_inputs_no_space_left(
self,
tmp_path: Path,
default_queue: Queue,
job_fixture_name: str,
s3_settings_fixture_name: str,
really_big_manifest: dict,
request: pytest.FixtureRequest,
):
"""Asserts that an AssetSyncError is thrown if there is not enough space left on the device to download all inputs."""
# GIVEN
job: Job = request.getfixturevalue(job_fixture_name)
s3_settings: JobAttachmentS3Settings = request.getfixturevalue(s3_settings_fixture_name)
default_queue.jobAttachmentSettings = s3_settings
dest_dir = "assetroot-27bggh78dd2b568ab123"
test_manifest = decode_manifest(json.dumps(really_big_manifest))
test_fs_permission_settings: PosixFileSystemPermissionSettings = (
PosixFileSystemPermissionSettings(
os_user="test-user",
os_group="test-group",
dir_mode=0o20,
file_mode=0o20,
)
)
os_env_vars: Dict[str, str] = {"AWS_PROFILE": "test-profile"}
assert job.attachments

# WHEN
with patch(
f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3",
return_value=test_manifest,
), patch(
f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests",
side_effect=[DownloadSummaryStatistics()],
), patch(
f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name",
side_effect=[dest_dir],
), patch.object(
Path, "stat", MagicMock(st_mtime_ns=1234512345123451)
):
mock_on_downloading_files = MagicMock(return_value=True)

with pytest.raises(AssetSyncError) as ase:
self.default_asset_sync.sync_inputs(
s3_settings,
job.attachments,
default_queue.queueId,
job.jobId,
tmp_path,
on_downloading_files=mock_on_downloading_files,
fs_permission_settings=test_fs_permission_settings,
os_env_vars=os_env_vars,
)

# THEN
assert (
"Total file size required for download (300.0 PB) is larger than available disk space"
in str(ase)
)

@mock_sts
@pytest.mark.parametrize(
(
Expand Down

0 comments on commit 4d40b8c

Please sign in to comment.