From 9f36fc75faf4e5dae0b4f15c99d1da9926497211 Mon Sep 17 00:00:00 2001 From: David Leong <116610336+leongdl@users.noreply.github.com> Date: Fri, 20 Sep 2024 17:38:28 -0700 Subject: [PATCH] feat(JA): manifest snapshot CLI command. Signed-off-by: David Leong <116610336+leongdl@users.noreply.github.com> --- src/deadline/client/cli/_deadline_cli.py | 2 + .../client/cli/_groups/manifest_group.py | 207 +++++++++++++ src/deadline/job_attachments/_diff.py | 115 +++++++ src/deadline/job_attachments/api/manifest.py | 143 +++++++++ src/deadline/job_attachments/exceptions.py | 6 + src/deadline/job_attachments/models.py | 11 + src/deadline/job_attachments/upload.py | 86 ++++-- test/integ/cli/test_cli_manifest.py | 93 ++++++ .../deadline_client/cli/test_cli_manifest.py | 285 ++++++++++++++++++ .../api/test_snapshot.py | 254 ++++++++++++++++ .../deadline_job_attachments/test_upload.py | 6 +- 11 files changed, 1187 insertions(+), 21 deletions(-) create mode 100644 src/deadline/client/cli/_groups/manifest_group.py create mode 100644 src/deadline/job_attachments/_diff.py create mode 100644 src/deadline/job_attachments/api/manifest.py create mode 100644 test/integ/cli/test_cli_manifest.py create mode 100644 test/unit/deadline_client/cli/test_cli_manifest.py create mode 100644 test/unit/deadline_job_attachments/api/test_snapshot.py diff --git a/src/deadline/client/cli/_deadline_cli.py b/src/deadline/client/cli/_deadline_cli.py index c4e918a6..f7f80500 100644 --- a/src/deadline/client/cli/_deadline_cli.py +++ b/src/deadline/client/cli/_deadline_cli.py @@ -23,6 +23,7 @@ from ._groups.queue_group import cli_queue from ._groups.worker_group import cli_worker from ._groups.attachment_group import cli_attachment +from ._groups.manifest_group import cli_manifest logger = getLogger(__name__) @@ -81,3 +82,4 @@ def main(ctx: click.Context, log_level: str): if os.environ.get("JOB_ATTACHMENT_CLI") is not None: main.add_command(cli_attachment) + main.add_command(cli_manifest) diff --git a/src/deadline/client/cli/_groups/manifest_group.py b/src/deadline/client/cli/_groups/manifest_group.py new file mode 100644 index 00000000..867b5ed4 --- /dev/null +++ b/src/deadline/client/cli/_groups/manifest_group.py @@ -0,0 +1,207 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +""" +All the `deadline manifest` commands: + * snapshot + * upload + * diff + * download +""" +from __future__ import annotations + +import dataclasses +import os +from typing import List +import click + +from deadline.job_attachments.api.manifest import ( + _manifest_snapshot, +) + +from ...exceptions import NonValidInputError +from .._common import _handle_error +from .click_logger import ClickLogger + + +@click.group(name="manifest") +@_handle_error +def cli_manifest(): + """ + Commands to work with AWS Deadline Cloud Job Attachments. + """ + + +@cli_manifest.command( + name="snapshot", + help="BETA - Generates a snapshot of files in a directory root as a Job Attachment Manifest.", +) +@click.option("--root", required=True, help="The root directory to snapshot. ") +@click.option( + "-d", + "--destination", + default=None, + help="Destination directory where manifest is created. Defaults to the manifest root directory.", +) +@click.option( + "-n", + "--name", + default=None, + help="Name of the manifest. A timestamp is added YYYY-MM-DD-HH-MM-SS for versioning.", +) +@click.option( + "-i", + "--include", + default=None, + help="Glob syntax of files and directories to include in the manifest. Can be provided multiple times.", +) +@click.option( + "-e", + "--exclude", + default=None, + help="Glob syntax of files and directories to exclude in the manifest. Can be provided multiple times.", + multiple=True, +) +@click.option( + "-ie", + "--include-exclude-config", + default=None, + help="Include and exclude config of files and directories to include and exclude. Can be a json file or json string.", + multiple=True, +) +@click.option("--diff", default=None, help="File Path to Asset Manifest to diff against.") +@click.option("--json", default=None, is_flag=True, help="Output is printed as JSON for scripting.") +@_handle_error +def manifest_snapshot( + root: str, + destination: str, + name: str, + include: List[str], + exclude: List[str], + include_exclude_config: str, + diff: str, + json: bool, + **args, +): + """ + Creates manifest of files specified by root directory. + """ + logger: ClickLogger = ClickLogger(is_json=json) + if not os.path.isdir(root): + raise NonValidInputError(f"Specified root directory {root} does not exist.") + + if destination and not os.path.isdir(destination): + raise NonValidInputError(f"Specified destination directory {destination} does not exist.") + elif destination is None: + destination = root + logger.echo(f"Manifest creation path defaulted to {root} \n") + + manifest_out = _manifest_snapshot( + root=root, + destination=destination, + name=name, + include=include, + exclude=exclude, + include_exclude_config=include_exclude_config, + diff=diff, + logger=logger, + ) + if manifest_out: + logger.json(dataclasses.asdict(manifest_out)) + + +@cli_manifest.command( + name="diff", + help="BETA - Compute the file difference of a root directory against an existing manifest for new, modified or deleted files.", +) +@click.option("--root", help="The root directory to compare changes to.") +@click.option( + "--manifest", + required=True, + help="The path to manifest file to diff against.", +) +@click.option( + "-i", + "--include", + default=None, + help="Glob syntax of files and directories to include in the manifest. Can be provided multiple times.", + multiple=True, +) +@click.option( + "-e", + "--exclude", + default=None, + help="Glob syntax of files and directories to exclude in the manifest. Can be provided multiple times.", + multiple=True, +) +@click.option( + "-ie", + "--include-exclude-config", + default=None, + help="Include and exclude config of files and directories to include and exclude. Can be a json file or json string.", +) +@click.option("--json", default=None, is_flag=True, help="Output is printed as JSON for scripting.") +@_handle_error +def manifest_diff( + root: str, + manifest: str, + include: List[str], + exclude: List[str], + include_exclude_config: str, + json: bool, + **args, +): + """ + Check file differences between a directory and specified manifest. + """ + raise NotImplementedError("This CLI is being implemented.") + + +@cli_manifest.command( + name="download", + help="BETA - Download Job Attachment Manifests for a Job, or Step including dependencies.", +) +@click.argument("download_dir") +@click.option("--profile", help="The AWS profile to use.") +@click.option("--job-id", required=True, help="The AWS Deadline Cloud Job to get. ") +@click.option("--step-id", help="The AWS Deadline Cloud Step to get. ") +@click.option("--farm-id", help="The AWS Deadline Cloud Farm to use. ") +@click.option("--queue-id", help="The AWS Deadline Cloud Queue to use. ") +@click.option( + "--json", default=None, is_flag=True, help="Output is printed as JSON for scripting. " +) +@_handle_error +def manifest_download( + download_dir: str, + job_id: str, + step_id: str, + json: bool, + **args, +): + """ + Downloads input manifest of previously submitted job. + """ + raise NotImplementedError("This CLI is being implemented.") + + +@cli_manifest.command( + name="upload", + help="BETA - Uploads a job attachment manifest file to a Content Addressable Storage's Manifest store. If calling via --s3-cas-path, it is recommended to use with --profile for a specific AWS profile with CAS S3 bucket access.", +) +@click.argument("manifest_file") +@click.option("--profile", help="The AWS profile to use.") +@click.option("--s3-cas-path", help="The path to the Content Addressable Storage root.") +@click.option( + "--farm-id", help="The AWS Deadline Cloud Farm to use. Alternative to using --s3-cas-path." +) +@click.option( + "--queue-id", help="The AWS Deadline Cloud Queue to use. Alternative to using --s3-cas-path." +) +@click.option("--json", default=None, is_flag=True, help="Output is printed as JSON for scripting.") +@_handle_error +def manifest_upload( + manifest_file: str, + s3_cas_path: str, + json: bool, + **args, +): + raise NotImplementedError("This CLI is being implemented.") diff --git a/src/deadline/job_attachments/_diff.py b/src/deadline/job_attachments/_diff.py new file mode 100644 index 00000000..b26f6ce8 --- /dev/null +++ b/src/deadline/job_attachments/_diff.py @@ -0,0 +1,115 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +import concurrent.futures + +import os +from pathlib import Path +from typing import Dict, List, Tuple +from deadline.client.config import config_file +from deadline.client.exceptions import NonValidInputError +from deadline.job_attachments.asset_manifests.base_manifest import ( + BaseAssetManifest, + BaseManifestPath, +) +from deadline.job_attachments.caches.hash_cache import HashCache +from deadline.job_attachments.models import AssetRootManifest, FileStatus +from deadline.job_attachments.upload import S3AssetManager + + +def diff_manifest( + asset_manager: S3AssetManager, + asset_root_manifest: AssetRootManifest, + manifest: str, + update: bool, +) -> List[(Tuple[FileStatus, BaseManifestPath])]: + """ + Gets the file paths in specified manifest if the contents of file have changed since its last snapshot. + Returns a list of FileStatus and BaseManifestPath + """ + manifest_dir_name: str = os.path.basename(manifest) + root_path: str = asset_root_manifest.root_path + input_paths: List[Path] = [] + + asset_manifest = asset_root_manifest.asset_manifest + if asset_manifest is None: + raise NonValidInputError("Manifest object not found, please check input manifest. ") + + for base_manifest_path in asset_manifest.paths: + if base_manifest_path.path.startswith(manifest_dir_name): + # skip the manifest folder, or else every upload will need an update after a previous change + continue + input_paths.append(Path(root_path, base_manifest_path.path)) + + return find_file_with_status( + asset_manager=asset_manager, + input_paths=input_paths, + root_path=root_path, + update=update, + statuses=[FileStatus.NEW, FileStatus.MODIFIED], + ) + + +def find_file_with_status( + asset_manager: S3AssetManager, + input_paths: List[Path], + root_path: str, + update: bool, + statuses: List[FileStatus], +) -> List[(Tuple[FileStatus, BaseManifestPath])]: + """ + Checks a manifest file, compares it to specified root directory or manifest of files with the local hash cache, and finds files that match the specified statuses. + Returns a list of tuples containing the file information, and its corresponding file status. + """ + cache_config: str = config_file.get_cache_directory() + + with HashCache(cache_config) as hash_cache: + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = { + executor.submit( + asset_manager._process_input_path, + path=path, + root_path=root_path, + hash_cache=hash_cache, + update=update, + ): path + for path in input_paths + } + status_paths: List[tuple] = [] + for future in concurrent.futures.as_completed(futures): + (file_status, _, manifestPath) = future.result() + if file_status in statuses: + status_paths.append((file_status, manifestPath)) + + return status_paths + + +def compare_manifest( + reference_manifest: BaseAssetManifest, compare_manifest: BaseAssetManifest +) -> List[(Tuple[FileStatus, BaseManifestPath])]: + """ + Compares two manifests, reference_manifest acting as the base, and compare_manifest acting as manifest with changes. + Returns a list of FileStatus and BaseManifestPath + """ + reference_dict: Dict[str, BaseManifestPath] = { + manifest_path.path: manifest_path for manifest_path in reference_manifest.paths + } + compare_dict: Dict[str, BaseManifestPath] = { + manifest_path.path: manifest_path for manifest_path in compare_manifest.paths + } + + differences: List[(Tuple[FileStatus, BaseManifestPath])] = [] + + # Find new files + for file_path, manifest_path in compare_dict.items(): + if file_path not in reference_dict: + differences.append((FileStatus.NEW, manifest_path)) + elif reference_dict[file_path].hash != manifest_path.hash: + differences.append((FileStatus.MODIFIED, manifest_path)) + else: + differences.append((FileStatus.UNCHANGED, manifest_path)) + + # Find deleted files + for file_path, manifest_path in reference_dict.items(): + if file_path not in compare_dict: + differences.append((FileStatus.DELETED, manifest_path)) + + return differences diff --git a/src/deadline/job_attachments/api/manifest.py b/src/deadline/job_attachments/api/manifest.py new file mode 100644 index 00000000..ccd66e8b --- /dev/null +++ b/src/deadline/job_attachments/api/manifest.py @@ -0,0 +1,143 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +import datetime +import os +from typing import List, Optional, Tuple + + +from deadline.client.api._job_attachment import _hash_attachments +from deadline.client.cli._common import _ProgressBarCallbackManager +from deadline.client.cli._groups.click_logger import ClickLogger +from deadline.job_attachments._diff import compare_manifest +from deadline.job_attachments._glob import _process_glob_inputs, _glob_paths +from deadline.job_attachments.asset_manifests.base_manifest import ( + BaseManifestPath, +) +from deadline.job_attachments.asset_manifests.decode import decode_manifest +from deadline.job_attachments.exceptions import ManifestCreationException +from deadline.job_attachments.models import ( + FileStatus, + GlobConfig, + JobAttachmentS3Settings, + ManifestSnapshot, + default_glob_all, +) +from deadline.job_attachments.upload import S3AssetManager + +""" +APIs here should be business logic only. It should perform one thing, and one thing well. +It should use basic primitives like S3 upload, download, boto3 APIs. +These APIs should be boto3 session agnostic and a specific Boto3 Credential to use. +""" + + +def _manifest_snapshot( + root: str, + destination: str, + name: str, + include: Optional[List[str]] = None, + exclude: Optional[List[str]] = None, + include_exclude_config: Optional[str] = None, + diff: Optional[str] = None, + logger: ClickLogger = ClickLogger(False), +) -> Optional[ManifestSnapshot]: + + # Get all files in the root. + glob_config: GlobConfig + if include or exclude: + include = include if include is not None else default_glob_all() + exclude = exclude if exclude is not None else [] + glob_config = GlobConfig(include_glob=include, exclude_glob=exclude) + elif include_exclude_config: + glob_config = _process_glob_inputs(include_exclude_config) + else: + # Default, include all. + glob_config = GlobConfig() + + inputs = _glob_paths(root, include=glob_config.include_glob, exclude=glob_config.exclude_glob) + + # Placeholder Asset Manager + asset_manager = S3AssetManager( + farm_id=" ", queue_id=" ", job_attachment_settings=JobAttachmentS3Settings(" ", " ") + ) + + hash_callback_manager = _ProgressBarCallbackManager(length=100, label="Hashing Attachments") + + upload_group = asset_manager.prepare_paths_for_upload( + input_paths=inputs, output_paths=[root], referenced_paths=[] + ) + # We only provided 1 root path, so output should only have 1 group. + assert len(upload_group.asset_groups) == 1 + + if upload_group.asset_groups: + _, manifests = _hash_attachments( + asset_manager=asset_manager, + asset_groups=upload_group.asset_groups, + total_input_files=upload_group.total_input_files, + total_input_bytes=upload_group.total_input_bytes, + print_function_callback=logger.echo, + hashing_progress_callback=hash_callback_manager.callback, + ) + + if not manifests or len(manifests) == 0: + logger.echo("No manifest generated") + return None + + # This is a hard failure, we are snapshotting 1 directory. + assert len(manifests) == 1 + output_manifest = manifests[0].asset_manifest + if output_manifest is None: + raise ManifestCreationException() + + # If this is a diff manifest, load the supplied manifest file. + if diff: + # Parse local manifest + with open(diff) as source_diff: + source_manifest_str = source_diff.read() + source_manifest = decode_manifest(source_manifest_str) + + # Get the differences + changed_paths: List[str] = [] + differences: List[Tuple[FileStatus, BaseManifestPath]] = compare_manifest( + source_manifest, output_manifest + ) + for diff_item in differences: + if diff_item[0] == FileStatus.MODIFIED or diff_item[0] == FileStatus.NEW: + full_diff_path = f"{root}/{diff_item[1].path}" + changed_paths.append(full_diff_path) + logger.echo(f"Found difference at: {full_diff_path}, Status: {diff_item[0]}") + + # Since the files are already hashed, we can easily re-use has_attachments to remake a diff manifest. + diff_group = asset_manager.prepare_paths_for_upload( + input_paths=changed_paths, output_paths=[root], referenced_paths=[] + ) + _, diff_manifests = _hash_attachments( + asset_manager=asset_manager, + asset_groups=diff_group.asset_groups, + total_input_files=diff_group.total_input_files, + total_input_bytes=diff_group.total_input_bytes, + print_function_callback=logger.echo, + hashing_progress_callback=hash_callback_manager.callback, + ) + output_manifest = diff_manifests[0].asset_manifest + + # Write created manifest into local file, at the specified location at destination + if output_manifest is not None: + + timestamp = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") + manifest_name = name if name else root.replace("/", "_") + manifest_name = manifest_name[1:] if manifest_name[0] == "_" else manifest_name + manifest_name = f"{manifest_name}-{timestamp}.manifest" + + local_manifest_file = os.path.join(destination, manifest_name) + os.makedirs(os.path.dirname(local_manifest_file), exist_ok=True) + with open(local_manifest_file, "w") as file: + file.write(output_manifest.encode()) + + # Output results. + logger.echo(f"Manifest Generated at {local_manifest_file}\n") + return ManifestSnapshot(manifest=local_manifest_file) + else: + # No manifest generated. + logger.echo("No manifest generated") + return None diff --git a/src/deadline/job_attachments/exceptions.py b/src/deadline/job_attachments/exceptions.py index 217f8b8a..725f5baf 100644 --- a/src/deadline/job_attachments/exceptions.py +++ b/src/deadline/job_attachments/exceptions.py @@ -80,6 +80,12 @@ class MissingJobAttachmentSettingsError(JobAttachmentsError): """ +class ManifestCreationException(Exception): + """ + Exception for errors related to Creating Manifests. + """ + + class MissingS3BucketError(JobAttachmentsError): """ Exception raised when attempting to use Job Attachments but the S3 bucket is not set in Queue. diff --git a/src/deadline/job_attachments/models.py b/src/deadline/job_attachments/models.py index c6984189..a69eaa2c 100644 --- a/src/deadline/job_attachments/models.py +++ b/src/deadline/job_attachments/models.py @@ -412,3 +412,14 @@ def get_hashed_source(self, hash_alg: HashAlgorithm) -> str: def get_hashed_source_path(self, hash_alg: HashAlgorithm) -> str: return hash_data(self.source_path.encode("utf-8"), hash_alg) + + +class FileStatus(Enum): + """ + Status of local files compared to manifest listed files, comparing hash and time modfied + """ + + UNCHANGED = 0 + NEW = 1 + MODIFIED = 2 + DELETED = 3 diff --git a/src/deadline/job_attachments/upload.py b/src/deadline/job_attachments/upload.py index 3d520612..7be2b58e 100644 --- a/src/deadline/job_attachments/upload.py +++ b/src/deadline/job_attachments/upload.py @@ -56,6 +56,7 @@ AssetRootManifest, AssetUploadGroup, Attachments, + FileStatus, FileSystemLocationType, JobAttachmentS3Settings, ManifestProperties, @@ -164,12 +165,9 @@ def upload_assets( """ # Upload asset manifest - hash_alg = manifest.get_default_hash_alg() - manifest_bytes = manifest.encode().encode("utf-8") - manifest_name_prefix = hash_data( - f"{file_system_location_name or ''}{str(source_root)}".encode(), hash_alg + (hash_alg, manifest_bytes, manifest_name) = S3AssetUploader._gather_upload_metadata( + manifest, source_root, file_system_location_name ) - manifest_name = f"{manifest_name_prefix}_input" if partial_manifest_prefix: partial_manifest_key = _join_s3_paths(partial_manifest_prefix, manifest_name) @@ -182,7 +180,10 @@ def upload_assets( if manifest_write_dir: self._write_local_manifest( - manifest_write_dir, manifest_name, full_manifest_key, manifest + manifest_write_dir, + manifest_name, + full_manifest_key, + manifest, ) self.upload_bytes_to_s3( @@ -203,26 +204,74 @@ def upload_assets( return (partial_manifest_key, hash_data(manifest_bytes, hash_alg)) + @staticmethod + def _gather_upload_metadata( + manifest: BaseAssetManifest, + source_root: Path, + file_system_location_name: Optional[str] = None, + ) -> tuple[HashAlgorithm, bytes, str]: + """ + Gathers metadata information of manifest to be used for writing the local manifest + """ + hash_alg = manifest.get_default_hash_alg() + manifest_bytes = manifest.encode().encode("utf-8") + manifest_name_prefix = hash_data( + f"{file_system_location_name or ''}{str(source_root)}".encode(), hash_alg + ) + manifest_name = f"{manifest_name_prefix}_input" + + return (hash_alg, manifest_bytes, manifest_name) + def _write_local_manifest( self, manifest_write_dir: str, manifest_name: str, full_manifest_key: str, manifest: BaseAssetManifest, + root_dir_name: Optional[str] = None, ) -> None: """ Writes a manifest file locally in a 'manifests' sub-directory. Also creates/appends to a file mapping the local manifest name to the full S3 key in the same directory. """ - local_manifest_file = Path(manifest_write_dir, "manifests", manifest_name) + self._write_local_input_manifest(manifest_write_dir, manifest_name, manifest, root_dir_name) + + self._write_local_manifest_s3_mapping(manifest_write_dir, manifest_name, full_manifest_key) + + def _write_local_input_manifest( + self, + manifest_write_dir: str, + manifest_name: str, + manifest: BaseAssetManifest, + root_dir_name: Optional[str] = None, + ): + """ + Creates 'manifests' sub-directory and writes a local input manifest file + """ + input_manifest_folder_name = "manifests" + if root_dir_name is not None: + input_manifest_folder_name = root_dir_name + "_" + input_manifest_folder_name + + local_manifest_file = Path(manifest_write_dir, input_manifest_folder_name, manifest_name) logger.info(f"Creating local manifest file: {local_manifest_file}\n") local_manifest_file.parent.mkdir(parents=True, exist_ok=True) with open(local_manifest_file, "w") as file: file.write(manifest.encode()) - # Create or append to an existing mapping file. We use this since path lengths can go beyond the - # file name length limit on Windows if we were to create the full S3 key path locally. - manifest_map_file = Path(manifest_write_dir, "manifests", "manifest_s3_mapping") + def _write_local_manifest_s3_mapping( + self, + manifest_write_dir: str, + manifest_name: str, + full_manifest_key: str, + manifest_dir_name: Optional[str] = None, + ): + """ + Create or append to an existing mapping file. We use this since path lengths can go beyond the + file name length limit on Windows if we were to create the full S3 key path locally. + """ + manifest_map_file = Path( + manifest_write_dir, manifest_dir_name or "manifests", "manifest_s3_mapping" + ) mapping = {"local_file": manifest_name, "s3_key": full_manifest_key} with open(manifest_map_file, "a") as mapping_file: mapping_file.write(f"{mapping}\n") @@ -720,7 +769,8 @@ def _process_input_path( root_path: str, hash_cache: HashCache, progress_tracker: Optional[ProgressTracker] = None, - ) -> Tuple[bool, int, base_manifest.BaseManifestPath]: + update: bool = True, + ) -> Tuple[FileStatus, int, base_manifest.BaseManifestPath]: # If it's cancelled, raise an AssetSyncCancelledError exception if progress_tracker and not progress_tracker.continue_reporting: raise AssetSyncCancelledError( @@ -733,7 +783,7 @@ def _process_input_path( hash_alg: HashAlgorithm = manifest_model.AssetManifest.get_default_hash_alg() full_path = str(path.resolve()) - is_new_or_modified: bool = False + file_status: FileStatus = FileStatus.UNCHANGED actual_modified_time = str(datetime.fromtimestamp(path.stat().st_mtime)) entry: Optional[HashCacheEntry] = hash_cache.get_entry(full_path, hash_alg) @@ -743,7 +793,7 @@ def _process_input_path( entry.last_modified_time = actual_modified_time entry.file_hash = hash_file(full_path, hash_alg) entry.hash_algorithm = hash_alg - is_new_or_modified = True + file_status = FileStatus.MODIFIED else: entry = HashCacheEntry( file_path=full_path, @@ -751,9 +801,9 @@ def _process_input_path( file_hash=hash_file(full_path, hash_alg), last_modified_time=actual_modified_time, ) - is_new_or_modified = True + file_status = FileStatus.NEW - if is_new_or_modified: + if file_status != FileStatus.UNCHANGED and update: hash_cache.put_entry(entry) file_size = path.resolve().stat().st_size @@ -767,7 +817,7 @@ def _process_input_path( path_args["mtime"] = trunc(path.stat().st_mtime_ns // 1000) path_args["size"] = file_size - return (is_new_or_modified, file_size, manifest_model.Path(**path_args)) + return (file_status, file_size, manifest_model.Path(**path_args)) def _create_manifest_file( self, @@ -792,10 +842,10 @@ def _create_manifest_file( for path in input_paths } for future in concurrent.futures.as_completed(futures): - (is_hashed, file_size, path_to_put_in_manifest) = future.result() + (file_status, file_size, path_to_put_in_manifest) = future.result() paths.append(path_to_put_in_manifest) if progress_tracker: - if is_hashed: + if file_status == FileStatus.NEW or file_status == FileStatus.MODIFIED: progress_tracker.increase_processed(1, file_size) else: progress_tracker.increase_skipped(1, file_size) diff --git a/test/integ/cli/test_cli_manifest.py b/test/integ/cli/test_cli_manifest.py new file mode 100644 index 00000000..06a2b882 --- /dev/null +++ b/test/integ/cli/test_cli_manifest.py @@ -0,0 +1,93 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +""" +Integ tests for the CLI asset commands. +""" +import os +import json +from click.testing import CliRunner +from deadline.client.cli._groups.manifest_group import cli_manifest +import pytest +import tempfile +from deadline.job_attachments.asset_manifests.hash_algorithms import hash_file, HashAlgorithm + +from deadline.client.cli import main + + +TEST_FILE_CONTENT = "test file content" +TEST_SUB_DIR_FILE_CONTENT = "subdir file content" +TEST_ROOT_DIR_FILE_CONTENT = "root file content" + +TEST_ROOT_FILE = "root_file.txt" +TEST_SUB_FILE = "subdir_file.txt" + +TEST_ROOT_DIR = "root_dir" +TEST_MANIFEST_DIR = "manifest_dir" +TEST_SUB_DIR_1 = "subdir1" +TEST_SUB_DIR_2 = "subdir2" + + +class TestSnapshot: + + @pytest.fixture + def temp_dir(self): + with tempfile.TemporaryDirectory() as tmpdir_path: + yield tmpdir_path + + def test_snapshot_basic(self, temp_dir): + """ + Snapshot with a valid root directory containing one file, and no other parameters. Basic test the CLI calls into the API. + Deeper testing is done at the API layer. + """ + # Given + root_dir = os.path.join(temp_dir, TEST_ROOT_DIR) + os.makedirs(root_dir) + manifest_dir = os.path.join(temp_dir, TEST_MANIFEST_DIR) + os.makedirs(manifest_dir) + file_path = os.path.join(root_dir, TEST_ROOT_FILE) + with open(file_path, "w") as f: + f.write(TEST_FILE_CONTENT) + + # When + runner = CliRunner() + # Temporary, always add cli_manifest until launched. + main.add_command(cli_manifest) + result = runner.invoke( + main, + [ + "manifest", + "snapshot", + "--root", + root_dir, + "--destination", + manifest_dir, + "--name", + "test", + "--json", + ], + ) + assert result.exit_code == 0, f"Non-Zeo exit code, CLI output {result.output}" + + # Then + # Check manifest file details to match correct content + manifest_files = os.listdir(manifest_dir) + assert ( + len(manifest_files) == 1 + ), f"Expected exactly one manifest file, but got {len(manifest_files)}" + manifest_file_name = manifest_files[0] + assert ( + "test" in manifest_file_name + ), f"Expected test in manifest file name, got {manifest_file_name}" + + manifest_file_path = os.path.join(manifest_dir, manifest_file_name) + + with open(manifest_file_path, "r") as f: + manifest_data = json.load(f) + + expected_hash = hash_file(file_path, HashAlgorithm("xxh128")) # hashed with xxh128 + manifest_data_paths = manifest_data["paths"] + assert ( + len(manifest_data_paths) == 1 + ), f"Expected exactly one path inside manifest, but got {len(manifest_data_paths)}" + assert manifest_data_paths[0]["path"] == TEST_ROOT_FILE + assert manifest_data_paths[0]["hash"] == expected_hash diff --git a/test/unit/deadline_client/cli/test_cli_manifest.py b/test/unit/deadline_client/cli/test_cli_manifest.py new file mode 100644 index 00000000..ccb70286 --- /dev/null +++ b/test/unit/deadline_client/cli/test_cli_manifest.py @@ -0,0 +1,285 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +import os +import pytest +from unittest.mock import patch, Mock, MagicMock +from click.testing import CliRunner + +from deadline.client.cli import main +from deadline.client.cli._groups import manifest_group +from deadline.client.api import _submit_job_bundle +from deadline.job_attachments.models import ( + AssetRootGroup, + JobAttachmentS3Settings, + Attachments, +) +from deadline.job_attachments.upload import S3AssetManager, S3AssetUploader +from deadline.job_attachments.caches import HashCache +from deadline.job_attachments.asset_manifests.base_manifest import BaseManifestPath +from deadline.job_attachments.asset_manifests.v2023_03_03 import AssetManifest +from deadline.job_attachments.asset_manifests.hash_algorithms import HashAlgorithm + + +@pytest.fixture +def mock_cachedb(): + mock_hash_cache = MagicMock(spec=HashCache) + mock_hash_cache.__enter__.return_value = mock_hash_cache + mock_hash_cache.__exit__.return_value = None + return mock_hash_cache + + +@pytest.fixture +def mock_prepare_paths_for_upload(): + with patch.object(S3AssetManager, "prepare_paths_for_upload") as mock: + yield mock + + +@pytest.fixture +def mock_hash_attachments(): + with patch( + "deadline.job_attachments.api.manifest._hash_attachments", return_value=(Mock(), []) + ) as mock: + yield mock + + +@pytest.fixture +def basic_asset_group(tmp_path): + root_dir = str(tmp_path) + return AssetRootGroup( + root_path=root_dir, + inputs=set(), + outputs=set(), + references=set(), + ) + + +@pytest.fixture +def mock_upload_group(basic_asset_group): + return Mock( + asset_groups=[basic_asset_group], + total_input_files=1, + total_input_bytes=100, + ) + + +@pytest.fixture +def basic_asset_manifest(): + return AssetManifest(paths=[], hash_alg=HashAlgorithm("xxh128"), total_size=0) + + +@pytest.fixture +def mock_attachment_settings(): + return Attachments(manifests=[], fileSystem="").to_dict + + +@pytest.fixture +def mock_init_objects(): + with patch.object(S3AssetManager, "__init__", lambda self, *args, **kwargs: None), patch.object( + S3AssetUploader, "__init__", lambda self, *args, **kwargs: None + ), patch.object(JobAttachmentS3Settings, "__init__", lambda self, *args, **kwargs: None): + yield + + +@pytest.fixture +def mock_update_manifest(basic_asset_manifest): + with patch.object(manifest_group, "update_manifest", return_value=basic_asset_manifest) as mock: + yield mock + + +@pytest.fixture +def mock_upload_attachments(): + with patch.object( + _submit_job_bundle.api, "_upload_attachments", return_value=MOCK_UPLOAD_ATTACHMENTS_RESPONSE + ) as mock: + yield mock + + +@pytest.fixture +def mock_create_manifest_file(): + def _mock_create_manifest_file(input_paths, root_path, hash_cache): + return AssetManifest( + paths=[ + BaseManifestPath( + path=os.path.join(root_path, "file1.txt"), hash="mock_hash_1", size=0, mtime=0 + ), + BaseManifestPath( + path=os.path.join(root_path, "subdir1", "file2.txt"), + hash="mock_hash_2", + size=0, + mtime=0, + ), + BaseManifestPath( + path=os.path.join(root_path, "subdir2", "subdir3", "file3.txt"), + hash="mock_hash_3", + size=0, + mtime=0, + ), + ], + hash_alg=HashAlgorithm("xxh128"), + total_size=0, + ) + + with patch.object( + S3AssetManager, "_create_manifest_file", side_effect=_mock_create_manifest_file + ): + yield + + +@pytest.fixture +def mock_read_local_manifest(): + def _mock_read_local_manifest(manifest): + return AssetManifest( + paths=[ + BaseManifestPath(path="file1.txt", hash="old_hash_1", size=0, mtime=0), + BaseManifestPath(path="subdir1/file2.txt", hash="old_hash_2", size=0, mtime=0), + BaseManifestPath( + path="subdir2/subdir3/file3.txt", hash="old_hash_3", size=0, mtime=0 + ), + ], + hash_alg=HashAlgorithm("xxh128"), + total_size=0, + ) + + with patch.object(manifest_group, "read_local_manifest", side_effect=_mock_read_local_manifest): + yield + + +MOCK_ROOT_DIR = "/path/to/root" +MOCK_MANIFEST_DIR = "/path/to/manifest" +MOCK_MANIFEST_OUT_DIR = "path/to/out/dir" +MOCK_MANIFEST_FILE = os.path.join(MOCK_MANIFEST_DIR, "manifest_input") +MOCK_INVALID_DIR = "/nopath/" +MOCK_UPLOAD_ATTACHMENTS_RESPONSE = {"manifests": [{"inputManifestPath": "s3://mock/manifest.json"}]} +MOCK_JOB_ATTACHMENTS = { + "manifests": [ + { + "inputManifestHash": "mock_input_manifest_hash", + "inputManifestPath": "mock_input_manifest_path", + "outputRelativeDirectories": ["mock_output_dir"], + "rootPath": "mock_root_path", + "rootPathFormat": "mock_root_path_format", + } + ] +} +MOCK_QUEUE = { + "queueId": "queue-0123456789abcdef0123456789abcdef", + "displayName": "mock_queue", + "description": "mock_description", + "jobAttachmentSettings": {"s3BucketName": "mock_bucket", "rootPrefix": "mock_deadline"}, +} + + +@pytest.mark.skip("Random Failure with no credentials on Github") +class TestSnapshot: + + def test_snapshot_root_directory_only( + self, tmp_path, mock_prepare_paths_for_upload, mock_hash_attachments, mock_upload_group + ): + """ + Tests if CLI snapshot command calls correctly with an exiting directory path at --root + """ + root_dir = str(tmp_path) + + temp_file = tmp_path / "temp_file.txt" + temp_file.touch() + + mock_prepare_paths_for_upload.return_value = mock_upload_group + + runner = CliRunner() + main.add_command(manifest_group.cli_manifest) + result = runner.invoke(main, ["manifest", "snapshot", "--root", root_dir]) + + assert result.exit_code == 0 + mock_prepare_paths_for_upload.assert_called_once_with( + input_paths=[str(temp_file)], output_paths=[root_dir], referenced_paths=[] + ) + mock_hash_attachments.assert_called_once() + + def test_invalid_root_directory(self, tmp_path): + """ + Tests if CLI snapshot raises error when called with an invalid --root with non-existing directory path + """ + invalid_root_dir = str(tmp_path / "invalid_dir") + + runner = CliRunner() + main.add_command(manifest_group.cli_manifest) + result = runner.invoke(main, ["manifest", "snapshot", "--root", invalid_root_dir]) + + assert result.exit_code != 0 + assert f"{invalid_root_dir}" in result.output + + def test_valid_manifest_out( + self, tmp_path, mock_prepare_paths_for_upload, mock_hash_attachments, mock_upload_group + ): + """ + Tests if CLI snapshot command correctly calls with --manifest-out arguement + """ + root_dir = str(tmp_path) + manifest_out_dir = tmp_path / "manifest_out" + manifest_out_dir.mkdir() + + temp_file = tmp_path / "temp_file.txt" + temp_file.touch() + + mock_prepare_paths_for_upload.return_value = mock_upload_group + + runner = CliRunner() + main.add_command(manifest_group.cli_manifest) + result = runner.invoke( + main, + [ + "manifest", + "snapshot", + "--root", + root_dir, + ], + ) + + assert result.exit_code == 0 + mock_prepare_paths_for_upload.assert_called_once_with( + input_paths=[str(temp_file)], output_paths=[root_dir], referenced_paths=[] + ) + mock_hash_attachments.assert_called_once() + + def test_invalid_manifest_out(self, tmp_path): + """ + Tests if CLI snapshot raises error when called with invalid --destination with non-existing directory path + """ + root_dir = str(tmp_path) + invalid_manifest_out = str(tmp_path / "nonexistent_dir") + + runner = CliRunner() + main.add_command(manifest_group.cli_manifest) + result = runner.invoke( + main, + ["manifest", "snapshot", "--root", root_dir, "--destination", invalid_manifest_out], + ) + + assert result.exit_code != 0 + assert f"{invalid_manifest_out}" in result.output + + def test_asset_snapshot_recursive( + self, tmp_path, mock_prepare_paths_for_upload, mock_hash_attachments, mock_upload_group + ): + """ + Tests if CLI snapshot works with snapshotting directories with nested data. + """ + root_dir = str(tmp_path) + subdir1 = tmp_path / "subdir1" + subdir2 = tmp_path / "subdir2" + subdir1.mkdir() + subdir2.mkdir() + (subdir1 / "file1.txt").touch() + (subdir2 / "file2.txt").touch() + + expected_inputs = {str(subdir2 / "file2.txt"), str(subdir1 / "file1.txt")} + mock_prepare_paths_for_upload.return_value = mock_upload_group + + runner = CliRunner() + main.add_command(manifest_group.cli_manifest) + result = runner.invoke(main, ["manifest", "snapshot", "--root", root_dir]) + + assert result.exit_code == 0 + actual_inputs = set(mock_prepare_paths_for_upload.call_args[1]["input_paths"]) + assert actual_inputs == expected_inputs + mock_hash_attachments.assert_called_once() diff --git a/test/unit/deadline_job_attachments/api/test_snapshot.py b/test/unit/deadline_job_attachments/api/test_snapshot.py new file mode 100644 index 00000000..9af406bc --- /dev/null +++ b/test/unit/deadline_job_attachments/api/test_snapshot.py @@ -0,0 +1,254 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +import json +import os +import tempfile +from typing import List, Optional +from deadline.job_attachments.api.manifest import _manifest_snapshot +from deadline.job_attachments.exceptions import ManifestCreationException +from deadline.job_attachments.models import ManifestSnapshot +import pytest + + +class TestSnapshotAPI: + + @pytest.fixture + def temp_dir(self): + with tempfile.TemporaryDirectory() as tmpdir_path: + yield tmpdir_path + + def test_snapshot_empty_folder(self, temp_dir): + """ + Snapshot with an invalid folder. Should find nothing and no manifest. + """ + + # Given foobar folder + root_dir = os.path.join(temp_dir, "foobar") + os.makedirs(root_dir) + + # When, Then. + with pytest.raises(ManifestCreationException): + _manifest_snapshot(root=root_dir, destination=temp_dir, name="test") + + def test_snapshot_folder(self, temp_dir): + """ + Snapshot with a folder and a single file in it. Should generate a manifest containing 1 file. + """ + + # Given snapshot folder and 1 test file + root_dir = os.path.join(temp_dir, "snapshot") + + test_file_name = "test_file" + test_file = os.path.join(root_dir, test_file_name) + os.makedirs(os.path.dirname(test_file), exist_ok=True) + with open(test_file, "w") as f: + f.write("testing123") + + # When + manifest: Optional[ManifestSnapshot] = _manifest_snapshot( + root=root_dir, destination=temp_dir, name="test" + ) + + # Then + assert manifest is not None + assert manifest.manifest is not None + with open(manifest.manifest, "r") as manifest_file: + manifest_payload = json.load(manifest_file) + assert len(manifest_payload["paths"]) == 1 + assert manifest_payload["paths"][0]["path"] == test_file_name + + def test_snapshot_recursive_folder(self, temp_dir): + """ + Snapshot with a folder a file, a nested folder and a file in the nested folder. + """ + + # Given snapshot folder and 1 test file + root_dir = os.path.join(temp_dir, "snapshot") + + test_file_name = "test_file" + test_file = os.path.join(root_dir, test_file_name) + os.makedirs(os.path.dirname(test_file), exist_ok=True) + with open(test_file, "w") as f: + f.write("testing123") + + nested_test_file_name = "nested_file" + nested_folder = "nested" + nested_test_file = os.path.join(root_dir, nested_folder, nested_test_file_name) + os.makedirs(os.path.dirname(nested_test_file), exist_ok=True) + with open(nested_test_file, "w") as f: + f.write("testing123") + + # When + manifest: Optional[ManifestSnapshot] = _manifest_snapshot( + root=root_dir, destination=temp_dir, name="test" + ) + + # Then + assert manifest is not None + assert manifest.manifest is not None + with open(manifest.manifest, "r") as manifest_file: + manifest_payload = json.load(manifest_file) + assert len(manifest_payload["paths"]) == 2 + files = set() + for item in manifest_payload["paths"]: + files.add(item["path"]) + + assert test_file_name in files + assert f"{nested_folder}/{nested_test_file_name}" in files + + @pytest.mark.parametrize( + "includes, excludes, results", + [ + pytest.param( + ["test_file", "**/nested_file"], None, ["test_file", "nested/nested_file"] + ), + pytest.param( + None, + ["excluded_test_file", "**/excluded_nested_file"], + ["test_file", "nested/nested_file"], + ), + pytest.param( + ["test_file"], ["excluded_test_file", "**/excluded_nested_file"], ["test_file"] + ), + pytest.param( + ["**/nested_file"], + ["excluded_test_file", "**/excluded_nested_file"], + ["nested/nested_file"], + ), + ], + ) + def test_snapshot_includes_excludes( + self, temp_dir, includes: List[str], excludes: List[str], results: List[str] + ): + """ + Snapshot with a folder a file, a nested folder and a file in the nested folder. + Include glob includes "test_file", "nested_file". + Should not pick up "excluded". + """ + + # Given snapshot folder and 1 test file + root_dir = os.path.join(temp_dir, "snapshot") + + test_file_name = "test_file" + test_file = os.path.join(root_dir, test_file_name) + os.makedirs(os.path.dirname(test_file), exist_ok=True) + with open(test_file, "w") as f: + f.write("testing123") + + excluded_test_file_name = "excluded_test_file" + excluded_test_file = os.path.join(root_dir, excluded_test_file_name) + with open(excluded_test_file, "w") as f: + f.write("testing123") + + nested_test_file_name = "nested_file" + nested_folder = "nested" + nested_test_file = os.path.join(root_dir, nested_folder, nested_test_file_name) + os.makedirs(os.path.dirname(nested_test_file), exist_ok=True) + with open(nested_test_file, "w") as f: + f.write("testing123") + + nested_excluded_test_file_name = "excluded_nested_file" + nested_excluded_test_file = os.path.join( + root_dir, nested_folder, nested_excluded_test_file_name + ) + with open(nested_excluded_test_file, "w") as f: + f.write("testing123") + + # When + manifest: Optional[ManifestSnapshot] = _manifest_snapshot( + root=root_dir, + destination=temp_dir, + name="test", + include=includes, + exclude=excludes, + ) + + # Then + assert manifest is not None + assert manifest.manifest is not None + with open(manifest.manifest, "r") as manifest_file: + manifest_payload = json.load(manifest_file) + assert len(manifest_payload["paths"]) == len(results) + files = set() + for item in manifest_payload["paths"]: + files.add(item["path"]) + + for result in results: + assert result in files + + def test_snapshot_diff(self, temp_dir): + """ + Create a snapshot with 1 file. Add a second file and make a diff manifest. + Only the second file should be found. + """ + # Given snapshot folder and 1 test file + root_dir = os.path.join(temp_dir, "snapshot") + + test_file_name = "test_file" + test_file = os.path.join(root_dir, test_file_name) + os.makedirs(os.path.dirname(test_file), exist_ok=True) + with open(test_file, "w") as f: + f.write("testing123") + + # When + manifest: Optional[ManifestSnapshot] = _manifest_snapshot( + root=root_dir, destination=temp_dir, name="test" + ) + + # Then + assert manifest is not None + assert manifest.manifest is not None + + # Given a second new file. + second_test_file_name = "second_file" + second_test_file = os.path.join(root_dir, second_test_file_name) + os.makedirs(os.path.dirname(second_test_file), exist_ok=True) + with open(second_test_file, "w") as f: + f.write("second123") + + # When snapshot again. + diffed_manifest: Optional[ManifestSnapshot] = _manifest_snapshot( + root=root_dir, destination=temp_dir, name="test", diff=manifest.manifest + ) + + # Then. We should find only the second file. + assert diffed_manifest is not None + assert diffed_manifest.manifest is not None + with open(diffed_manifest.manifest, "r") as diff_manifest_file: + manifest_payload = json.load(diff_manifest_file) + assert len(manifest_payload["paths"]) == 1 + files = set() + for item in manifest_payload["paths"]: + files.add(item["path"]) + + assert second_test_file_name in files + + def test_snapshot_diff_no_diff(self, temp_dir): + """ + Create a snapshot with 1 file. Snapshot again and diff. It should have no manifest. + """ + # Given snapshot folder and 1 test file + root_dir = os.path.join(temp_dir, "snapshot") + + test_file_name = "test_file" + test_file = os.path.join(root_dir, test_file_name) + os.makedirs(os.path.dirname(test_file), exist_ok=True) + with open(test_file, "w") as f: + f.write("testing123") + + # When + manifest: Optional[ManifestSnapshot] = _manifest_snapshot( + root=root_dir, destination=temp_dir, name="test" + ) + + # Then + assert manifest is not None + assert manifest.manifest is not None + + # When snapshot again. + diffed_manifest: Optional[ManifestSnapshot] = _manifest_snapshot( + root=root_dir, destination=temp_dir, name="test", diff=manifest.manifest + ) + + # Then. We should find no new manifest, there were no files to snapshot + assert diffed_manifest is None diff --git a/test/unit/deadline_job_attachments/test_upload.py b/test/unit/deadline_job_attachments/test_upload.py index 85801f8e..d78c514e 100644 --- a/test/unit/deadline_job_attachments/test_upload.py +++ b/test/unit/deadline_job_attachments/test_upload.py @@ -52,7 +52,7 @@ ProgressStatus, SummaryStatistics, ) -from deadline.job_attachments.upload import S3AssetManager, S3AssetUploader +from deadline.job_attachments.upload import FileStatus, S3AssetManager, S3AssetUploader from deadline.job_attachments._utils import _human_readable_file_size from ..conftest import is_windows_non_admin @@ -1605,7 +1605,7 @@ def test_process_input_path_cached_file_is_updated( ) # THEN - assert is_hashed is True + assert is_hashed == FileStatus.NEW or is_hashed == FileStatus.MODIFIED assert man_path.path == "test.txt" assert man_path.hash == "b" hash_cache.put_entry.assert_called_with(expected_entry) @@ -1642,7 +1642,7 @@ def test_process_input_path_skip_file_already_in_hash_cache(self, farm_id, queue ) # THEN - assert is_hashed is False + assert is_hashed == FileStatus.UNCHANGED assert size == file_bytes assert man_path.path == "test.txt" assert man_path.hash == "a"