Skip to content

Commit

Permalink
feat!: refactor manifest aggregation, add helper to persist manifests…
Browse files Browse the repository at this point in the history
… and check disk capacity (#483)

* feat!: refactor manifest aggregation, add helper to persist manifests and check disk capacity

- refactor manifest aggregation
- add helper to persist manifests and check disk capacity
- breaking change: prefix underscore for a function meant to be private

Signed-off-by: Godot Bian <[email protected]>
  • Loading branch information
godobyte authored Oct 30, 2024
1 parent 7990fbf commit f57f637
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 23 deletions.
77 changes: 57 additions & 20 deletions src/deadline/job_attachments/asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def get_local_destination(
f"No path mapping rule found for the source path {manifest_properties.rootPath}"
)

def aggregate_asset_root_manifests(
def _aggregate_asset_root_manifests(
self,
session_dir: Path,
s3_settings: JobAttachmentS3Settings,
Expand All @@ -176,7 +176,7 @@ def aggregate_asset_root_manifests(
step_dependencies: Optional[list[str]] = None,
dynamic_mapping_rules: dict[str, PathMappingRule] = {},
storage_profiles_path_mapping_rules: dict[str, str] = {},
) -> DefaultDict[str, list[BaseAssetManifest]]:
) -> dict[str, BaseAssetManifest]:
"""
Args:
session_dir: the directory that the session is going to use.
Expand All @@ -187,7 +187,7 @@ def aggregate_asset_root_manifests(
step_dependencies: the list of Step IDs whose output should be downloaded over the input job attachments.
dynamic_mapping_rules: manifest root path to worker host destination mapping relative to local session.
storage_profiles_path_mapping_rules: manifest root path to worker host destination mapping given storage profile.
Returns: a dictionary of manifest files stored in the session directory.
Returns: a dictionary of manifest file stored in the session directory.
"""
grouped_manifests_by_root: DefaultDict[str, list[BaseAssetManifest]] = DefaultDict(list)

Expand Down Expand Up @@ -225,7 +225,15 @@ def aggregate_asset_root_manifests(
local_root = str(session_dir.joinpath(dir_name))
grouped_manifests_by_root[local_root].extend(manifests)

return grouped_manifests_by_root
# Merge the manifests in each root into a single manifest
merged_manifests_by_root: dict[str, BaseAssetManifest] = dict()
for root, manifests in grouped_manifests_by_root.items():
merged_manifest = merge_asset_manifests(manifests)

if merged_manifest:
merged_manifests_by_root[root] = merged_manifest

return merged_manifests_by_root

def _launch_vfs(
self,
Expand All @@ -242,7 +250,7 @@ def _launch_vfs(
fs_permission_settings: An instance defining group ownership and permission modes
to be set on the downloaded (synchronized) input files and directories.
merged_manifests_by_root: Merged manifests produced by
aggregate_asset_root_manifests()
_aggregate_asset_root_manifests()
Returns: None
Raises: VFSExecutableMissingError If VFS is not startable.
"""
Expand Down Expand Up @@ -278,7 +286,7 @@ def copied_download(
session_dir: the directory that the session is going to use.
fs_permission_settings: An instance defining group ownership and permission modes
to be set on the downloaded (synchronized) input files and directories.
merged_manifests_by_root: Merged manifests produced by aggregate_asset_root_manifests()
merged_manifests_by_root: Merged manifests produced by _aggregate_asset_root_manifests()
on_downloading_files: Callback when download files from S3.
Returns:
Expand All @@ -288,6 +296,11 @@ def copied_download(
JobAttachmentsS3ClientError if any issue is encountered while downloading.
"""
try:
total_input_size: int = 0
for merged_manifest in merged_manifests_by_root.values():
total_input_size += merged_manifest.totalSize # type: ignore[attr-defined]
self._ensure_disk_capacity(Path(session_dir), total_input_size)

return download_files_from_manifests(
s3_bucket=s3_settings.s3BucketName,
manifests_by_root=merged_manifests_by_root,
Expand All @@ -313,6 +326,41 @@ def copied_download(
else:
raise

def _check_and_write_local_manifests(
self, merged_manifests_by_root: dict[str, BaseAssetManifest], manifest_write_dir: str
) -> list[str]:
"""Write manifests to the directory and check disk capacity is sufficient for the assets.
Args:
merged_manifests_by_root (dict[str, BaseAssetManifest]): manifest file to its stored root.
manifest_write_dir (str): local directory to write to.
Returns:
list[str]: file paths the manifests are written to.
"""

total_input_size: int = 0
manifest_paths: list[str] = list()

for root, manifest in merged_manifests_by_root.items():
(_, _, manifest_name) = S3AssetUploader._gather_upload_metadata(
manifest=manifest,
source_root=Path(root),
manifest_name_suffix="manifest",
)

local_manifest_file = S3AssetUploader._write_local_input_manifest(
manifest_write_dir=manifest_write_dir,
manifest_name=manifest_name,
manifest=manifest,
)

total_input_size += manifest.totalSize # type: ignore[attr-defined]
manifest_paths.append(local_manifest_file.as_posix())

self._ensure_disk_capacity(Path(manifest_write_dir), total_input_size)
return manifest_paths

def attachment_sync_inputs(
self,
s3_settings: Optional[JobAttachmentS3Settings],
Expand Down Expand Up @@ -375,9 +423,9 @@ def attachment_sync_inputs(
attachments=attachments,
)

# Aggregate manifests (with step step dependency handling)
grouped_manifests_by_root: DefaultDict[str, list[BaseAssetManifest]] = (
self.aggregate_asset_root_manifests(
# Aggregate and merge manifests (with step step dependency handling) in each root into a single manifest
merged_manifests_by_root: dict[str, BaseAssetManifest] = (
self._aggregate_asset_root_manifests(
session_dir=session_dir,
s3_settings=s3_settings,
queue_id=queue_id,
Expand All @@ -389,16 +437,6 @@ def attachment_sync_inputs(
)
)

# Merge the manifests in each root into a single manifest
merged_manifests_by_root: dict[str, BaseAssetManifest] = dict()
total_input_size: int = 0
for root, manifests in grouped_manifests_by_root.items():
merged_manifest = merge_asset_manifests(manifests)

if merged_manifest:
merged_manifests_by_root[root] = merged_manifest
total_input_size += merged_manifest.totalSize # type: ignore[attr-defined]

# Download
summary_statistics: SummaryStatistics = SummaryStatistics()
if (
Expand All @@ -419,7 +457,6 @@ def attachment_sync_inputs(
)
else:
# Copied Download flow
self._ensure_disk_capacity(session_dir, total_input_size)
summary_statistics = self.copied_download(
s3_settings=s3_settings,
session_dir=session_dir,
Expand Down
6 changes: 4 additions & 2 deletions src/deadline/job_attachments/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,13 @@ def _write_local_manifest(

self._write_local_manifest_s3_mapping(manifest_write_dir, manifest_name, full_manifest_key)

@staticmethod
def _write_local_input_manifest(
self,
manifest_write_dir: str,
manifest_name: str,
manifest: BaseAssetManifest,
root_dir_name: Optional[str] = None,
):
) -> Path:
"""
Creates 'manifests' sub-directory and writes a local input manifest file
"""
Expand All @@ -268,6 +268,8 @@ def _write_local_input_manifest(
with open(local_manifest_file, "w") as file:
file.write(manifest.encode())

return local_manifest_file

def _write_local_manifest_s3_mapping(
self,
manifest_write_dir: str,
Expand Down
71 changes: 70 additions & 1 deletion test/unit/deadline_job_attachments/test_asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ def test_attachment_sync_inputs_404_error(
("default_job_attachment_s3_settings"),
],
)
def test_sync_attachment_inputs_with_step_dependencies(
def test_attachment_sync_inputs_with_step_dependencies(
self,
tmp_path: Path,
default_queue: Queue,
Expand Down Expand Up @@ -1471,6 +1471,75 @@ def test_attachment_sync_inputs_no_space_left(
in str(ase)
)

def test_aggregate_asset_root_manifests_and_write(
self,
default_queue: Queue,
default_job: Job,
default_job_attachment_s3_settings: JobAttachmentS3Settings,
test_manifest_one: dict,
tmp_path: Path,
):
test_manifest = decode_manifest(json.dumps(test_manifest_one))
dest_dir = "assetroot"

default_job.attachments = Attachments(
manifests=[
ManifestProperties(
rootPath="/root/tmp",
rootPathFormat=PathFormat.POSIX,
inputManifestPath="manifest_input",
inputManifestHash="manifesthash",
outputRelativeDirectories=["test/outputs"],
),
ManifestProperties(
fileSystemLocationName="Movie 1",
rootPath="/home/user/movie1",
rootPathFormat=PathFormat.POSIX,
inputManifestPath="manifest-movie1_input",
inputManifestHash="manifestmovie1hash",
outputRelativeDirectories=["test/outputs"],
),
],
)
manifest_count = len(default_job.attachments.manifests)
storage_profiles_path_mapping_rules = {
"/home/user/movie1": "/root/tmp/movie1",
}

with patch(
f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3",
return_value=test_manifest,
) as mock_get_manifest_from_s3, patch(
f"{deadline.__package__}.job_attachments.asset_sync.merge_asset_manifests",
return_value=test_manifest,
) as mock_merge_asset_manifests, patch(
f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name",
side_effect=[dest_dir],
), patch(
f"{deadline.__package__}.job_attachments.asset_sync.S3AssetUploader._write_local_input_manifest",
return_value=tmp_path.joinpath("manifest/hasn_manifest"),
) as mock__write_local_input_manifest:

merged_manifests_by_root = self.default_asset_sync._aggregate_asset_root_manifests(
session_dir=tmp_path,
s3_settings=default_job_attachment_s3_settings,
queue_id=default_queue.queueId,
job_id=default_job.jobId,
attachments=default_job.attachments,
dynamic_mapping_rules=self.default_asset_sync.generate_dynamic_path_mapping(
session_dir=tmp_path, attachments=default_job.attachments
),
storage_profiles_path_mapping_rules=storage_profiles_path_mapping_rules,
)
assert mock_merge_asset_manifests.call_count == manifest_count
assert mock_get_manifest_from_s3.call_count == manifest_count

paths = self.default_asset_sync._check_and_write_local_manifests(
merged_manifests_by_root=merged_manifests_by_root, manifest_write_dir=str(tmp_path)
)
assert mock__write_local_input_manifest.call_count == manifest_count
assert len(paths) == manifest_count

def test_attachment_sync_inputs_with_storage_profiles_path_mapping_rules(
self,
default_queue: Queue,
Expand Down

0 comments on commit f57f637

Please sign in to comment.