diff --git a/src/deadline/client/api/__init__.py b/src/deadline/client/api/__init__.py index 877c55a4..4933e6ee 100644 --- a/src/deadline/client/api/__init__.py +++ b/src/deadline/client/api/__init__.py @@ -5,6 +5,7 @@ "logout", "create_job_from_job_bundle", "hash_attachments", + "upload_attachments", "wait_for_create_job_to_complete", "get_boto3_session", "get_boto3_client", @@ -57,6 +58,7 @@ create_job_from_job_bundle, wait_for_create_job_to_complete, hash_attachments, + upload_attachments, ) from ._telemetry import ( get_telemetry_client, diff --git a/src/deadline/client/api/_submit_job_bundle.py b/src/deadline/client/api/_submit_job_bundle.py index eadcbc4b..11d3605c 100644 --- a/src/deadline/client/api/_submit_job_bundle.py +++ b/src/deadline/client/api/_submit_job_bundle.py @@ -285,7 +285,7 @@ def create_job_from_job_bundle( hashing_progress_callback=hashing_progress_callback, ) - attachment_settings = _upload_attachments( + attachment_settings = upload_attachments( asset_manager, asset_manifests, print_function_callback, upload_progress_callback ) attachment_settings["fileSystem"] = JobAttachmentsFileSystem( @@ -432,7 +432,7 @@ def _default_update_hash_progress(hashing_metadata: Dict[str, str]) -> bool: return hashing_summary, manifests -def _upload_attachments( +def upload_attachments( asset_manager: S3AssetManager, manifests: List[AssetRootManifest], print_function_callback: Callable = lambda msg: None, diff --git a/src/deadline/client/cli/_groups/asset_group.py b/src/deadline/client/cli/_groups/asset_group.py index 6f1bce15..95932fd8 100644 --- a/src/deadline/client/cli/_groups/asset_group.py +++ b/src/deadline/client/cli/_groups/asset_group.py @@ -9,15 +9,27 @@ """ import os from pathlib import Path +import concurrent.futures +from typing import List +import glob import click from deadline.client import api -from deadline.job_attachments.upload import S3AssetManager, S3AssetUploader -from deadline.job_attachments.models import JobAttachmentS3Settings +from deadline.job_attachments.upload import FileStatus, S3AssetManager, S3AssetUploader +from deadline.job_attachments.models import ( + JobAttachmentS3Settings, + AssetRootManifest, +) +from deadline.job_attachments.asset_manifests.decode import decode_manifest +from deadline.job_attachments.asset_manifests.base_manifest import BaseAssetManifest +from deadline.job_attachments.caches import HashCache -from .._common import _handle_error, _ProgressBarCallbackManager -from ...exceptions import NonValidInputError +from .._common import _apply_cli_options_to_config, _handle_error, _ProgressBarCallbackManager +from ...exceptions import NonValidInputError, ManifestOutdatedError +from ...config import get_setting, config_file +import boto3 +from botocore.client import BaseClient @click.group(name="asset") @@ -104,7 +116,13 @@ def asset_snapshot(root_dir: str, manifest_out: str, recursive: bool, **args): @cli_asset.command(name="upload") @click.option( - "--manifest", help="The path to manifest folder of the directory specified for upload. " + "--root-dir", + help="The root directory of assets to upload. Defaults to the parent directory of --manifest-dir if not specified. ", +) +@click.option( + "--manifest-dir", + required=True, + help="The path to manifest folder of the directory specified for upload. ", ) @click.option("--farm-id", help="The AWS Deadline Cloud Farm to use. ") @click.option("--queue-id", help="The AWS Deadline Cloud Queue to use. ") @@ -116,11 +134,104 @@ def asset_snapshot(root_dir: str, manifest_out: str, recursive: bool, **args): default=False, ) @_handle_error -def asset_upload(**args): +def asset_upload(root_dir: str, manifest_dir: str, update: bool, **args): """ Uploads the assets in the provided manifest file to S3. """ - click.echo("upload done") + + if not os.path.isdir(manifest_dir): + raise NonValidInputError(f"Specified manifest directory {manifest_dir} does not exist. ") + + if root_dir is None: + asset_root_dir = os.path.dirname(manifest_dir) + else: + if not os.path.isdir(root_dir): + raise NonValidInputError(f"Specified root directory {root_dir} does not exist. ") + asset_root_dir = root_dir + + config = _apply_cli_options_to_config(required_options={"farm_id", "queue_id"}, **args) + upload_callback_manager: _ProgressBarCallbackManager = _ProgressBarCallbackManager( + length=100, label="Uploading Attachments" + ) + + deadline: BaseClient = api.get_boto3_client("deadline", config=config) + queue_id: str = get_setting("defaults.queue_id", config=config) + farm_id: str = get_setting("defaults.farm_id", config=config) + + queue: dict = deadline.get_queue( + farmId=farm_id, + queueId=queue_id, + ) + + # assume queue role - session permissions + queue_role_session: boto3.Session = api.get_queue_user_boto3_session( + deadline=deadline, + config=config, + farm_id=farm_id, + queue_id=queue_id, + queue_display_name=queue["displayName"], + ) + + asset_manager: S3AssetManager = S3AssetManager( + farm_id=farm_id, + queue_id=queue_id, + job_attachment_settings=JobAttachmentS3Settings(**queue["jobAttachmentSettings"]), + session=queue_role_session, + ) + + asset_uploader: S3AssetUploader = S3AssetUploader() + + # read local manifest into BaseAssetManifest object + asset_manifest: BaseAssetManifest = read_local_manifest(manifest=manifest_dir) + clear_S3_mapping(manifest=manifest_dir) + + if asset_manifest is None: + raise NonValidInputError( + f"Specified manifest directory {manifest_dir} does contain valid manifest input file. " + ) + + asset_root_manifest: AssetRootManifest = AssetRootManifest( + root_path=asset_root_dir, + asset_manifest=asset_manifest, + ) + + manifest_changes: List[tuple] = diff_manifest( + asset_manager=asset_manager, + asset_root_manifest=asset_root_manifest, + manifest=manifest_dir, + update=update, + ) + + # if there are modified files, will either auto --update manifest or prompt user of file discrepancy + if len(manifest_changes) > 0: + if update: + asset_root_manifest.asset_manifest = update_manifest( + manifest=manifest_dir, new_or_modified_paths=manifest_changes + ) + click.echo(f"Manifest information updated: {len(manifest_changes)} files updated. \n") + else: + raise ManifestOutdatedError( + f"Manifest contents in {manifest_dir} are outdated; versioning does not match local files in {asset_root_dir}. Please run with --update to fix current files. \n" + ) + + attachment_settings: dict = api.upload_attachments( + asset_manager=asset_manager, + manifests=[asset_root_manifest], + print_function_callback=click.echo, + upload_progress_callback=upload_callback_manager.callback, + ) + + full_manifest_key: str = attachment_settings["manifests"][0]["inputManifestPath"] + manifest_name = os.path.basename(full_manifest_key) + manifest_dir_name = os.path.basename(manifest_dir) + asset_uploader._write_local_manifest_s3_mapping( + manifest_write_dir=asset_root_dir, + manifest_name=manifest_name, + full_manifest_key=full_manifest_key, + manifest_dir_name=manifest_dir_name, + ) + + click.echo(f"Upload of {asset_root_dir} complete. \n") @cli_asset.command(name="diff") @@ -155,3 +266,145 @@ def asset_download(**args): Downloads input manifest of previously submitted job. """ click.echo("download complete") + + +def read_local_manifest(manifest: str) -> BaseAssetManifest: + """ + Read manifests specified by filepath to manifest folder, returns BaseAssetManifest Object + """ + input_files = glob.glob(os.path.join(manifest, "*_input")) + + if not input_files: + raise ValueError(f"No manifest files found in {manifest}") + elif len(input_files) >= 2: + raise NonValidInputError( + f"Multiple input manifest files are not supported, found: {input_files}." + ) + + manifest_file_path = input_files[0] + + with open(manifest_file_path, "r") as input_file: + manifest_data_str = input_file.read() + asset_manifest = decode_manifest(manifest_data_str) + + return asset_manifest + + +def clear_S3_mapping(manifest: str): + """ + Clears manifest_s3_mapping file contents if it previously exists. + """ + for filename in os.listdir(manifest): + if filename.endswith("manifest_s3_mapping"): + # if S3 mapping already exists, clear contents + filepath = os.path.join(manifest, filename) + with open(filepath, "w") as _: + pass + + +def diff_manifest( + asset_manager: S3AssetManager, + asset_root_manifest: AssetRootManifest, + manifest: str, + update: bool, +) -> List[tuple]: + """ + Gets the file paths in specified manifest if the contents of file have changed since its last snapshot. + """ + manifest_dir_name = os.path.basename(manifest) + root_path = asset_root_manifest.root_path + input_paths: List[Path] = [] + + asset_manifest = asset_root_manifest.asset_manifest + if asset_manifest is None: + raise NonValidInputError("Manifest object not found, please check input manifest. ") + + for base_manifest_path in asset_manifest.paths: + if base_manifest_path.path.startswith(manifest_dir_name): + # skip the manifest folder, or else every upload will need an update after a previous change + continue + input_paths.append(Path(root_path, base_manifest_path.path)) + + return find_file_with_status( + asset_manager=asset_manager, + input_paths=input_paths, + root_path=root_path, + update=update, + statuses=[FileStatus.NEW, FileStatus.MODIFIED], + ) + + +def find_file_with_status( + asset_manager: S3AssetManager, + input_paths: List[Path], + root_path: str, + update: bool, + statuses: List[FileStatus], +) -> List[tuple]: + """ + Checks a manifest file, compares it to specified root directory or manifest of files with the local hash cache, and finds files that match the specified statuses. + Returns a list of tuples containing the file information, and its corresponding file status. + """ + cache_config = config_file.get_cache_directory() + + with HashCache(cache_config) as hash_cache: + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = { + executor.submit( + asset_manager._process_input_path, + path=path, + root_path=root_path, + hash_cache=hash_cache, + update=update, + ): path + for path in input_paths + } + status_paths: List[tuple] = [] + for future in concurrent.futures.as_completed(futures): + (file_status, _, manifestPath) = future.result() + if file_status in statuses: + status_paths.append((file_status, manifestPath)) + + return status_paths + + +def update_manifest(manifest: str, new_or_modified_paths: List[tuple]) -> BaseAssetManifest: + """ + Updates the local manifest file to reflect modified or new files + """ + input_files = glob.glob(os.path.join(manifest, "*_input")) + + if not input_files: + raise ValueError(f"No manifest files found in {manifest}") + elif len(input_files) >= 2: + raise NonValidInputError( + f"Multiple input manifest files are not supported, found: {input_files}." + ) + + manifest_file_path = input_files[0] + + with open(manifest_file_path, "r") as manifest_file: + manifest_data_str = manifest_file.read() + local_base_asset_manifest = decode_manifest(manifest_data_str) + + # maps paths of local to optimize updating of manifest entries + manifest_info_dict = { + base_manifest_path.path: base_manifest_path + for base_manifest_path in local_base_asset_manifest.paths + } + + for _, base_asset_manifest in new_or_modified_paths: + if base_asset_manifest.path in manifest_info_dict: + # Update the hash_value of the existing object + manifest_info_dict[base_asset_manifest.path].hash = base_asset_manifest.hash + else: + # Add the new object if it doesn't exist + manifest_info_dict[base_asset_manifest.path] = base_asset_manifest + + # write to local manifest + updated_path_list = list(manifest_info_dict.values()) + local_base_asset_manifest.paths = updated_path_list + with open(manifest_file_path, "w") as manifest_file: + manifest_file.write(local_base_asset_manifest.encode()) + + return local_base_asset_manifest diff --git a/src/deadline/client/exceptions.py b/src/deadline/client/exceptions.py index 5830d7c6..7a527dc4 100644 --- a/src/deadline/client/exceptions.py +++ b/src/deadline/client/exceptions.py @@ -19,3 +19,7 @@ class UserInitiatedCancel(Exception): class NonValidInputError(Exception): """Error for when the user input is nonvalid""" + + +class ManifestOutdatedError(Exception): + """Error for when local files are different from version captured in manifest""" diff --git a/src/deadline/job_attachments/upload.py b/src/deadline/job_attachments/upload.py index 7c0fffb6..a91c38ad 100644 --- a/src/deadline/job_attachments/upload.py +++ b/src/deadline/job_attachments/upload.py @@ -17,6 +17,7 @@ from math import trunc from pathlib import Path, PurePath from typing import Any, Callable, Generator, Optional, Tuple, Type, Union +from enum import Enum import boto3 from boto3.s3.transfer import ProgressCallbackInvoker @@ -82,6 +83,16 @@ S3_UPLOAD_MAX_CONCURRENCY: int = 10 +class FileStatus(Enum): + """ + Status of local files compared to manifest listed files, comparing hash and time modfied + """ + + UNCHANGED = 0 + NEW = 1 + MODIFIED = 2 + + class S3AssetUploader: """ Handler for uploading assets to S3 based off of an Asset Manifest. If no session is provided the default @@ -239,7 +250,7 @@ def _write_local_input_manifest( manifest_write_dir: str, manifest_name: str, manifest: BaseAssetManifest, - root_dir_name: Optional[str], + root_dir_name: Optional[str] = None, ): """ Creates 'manifests' sub-directory and writes a local input manifest file @@ -259,12 +270,15 @@ def _write_local_manifest_s3_mapping( manifest_write_dir: str, manifest_name: str, full_manifest_key: str, + manifest_dir_name: Optional[str] = None, ): """ Create or append to an existing mapping file. We use this since path lengths can go beyond the file name length limit on Windows if we were to create the full S3 key path locally. """ - manifest_map_file = Path(manifest_write_dir, "manifests", "manifest_s3_mapping") + manifest_map_file = Path( + manifest_write_dir, manifest_dir_name or "manifests", "manifest_s3_mapping" + ) mapping = {"local_file": manifest_name, "s3_key": full_manifest_key} with open(manifest_map_file, "a") as mapping_file: mapping_file.write(f"{mapping}\n") @@ -762,7 +776,8 @@ def _process_input_path( root_path: str, hash_cache: HashCache, progress_tracker: Optional[ProgressTracker] = None, - ) -> Tuple[bool, int, base_manifest.BaseManifestPath]: + update: bool = True, + ) -> Tuple[FileStatus, int, base_manifest.BaseManifestPath]: # If it's cancelled, raise an AssetSyncCancelledError exception if progress_tracker and not progress_tracker.continue_reporting: raise AssetSyncCancelledError( @@ -775,7 +790,7 @@ def _process_input_path( hash_alg: HashAlgorithm = manifest_model.AssetManifest.get_default_hash_alg() full_path = str(path.resolve()) - is_new_or_modified: bool = False + file_status: FileStatus = FileStatus.UNCHANGED actual_modified_time = str(datetime.fromtimestamp(path.stat().st_mtime)) entry: Optional[HashCacheEntry] = hash_cache.get_entry(full_path, hash_alg) @@ -785,7 +800,7 @@ def _process_input_path( entry.last_modified_time = actual_modified_time entry.file_hash = hash_file(full_path, hash_alg) entry.hash_algorithm = hash_alg - is_new_or_modified = True + file_status = FileStatus.MODIFIED else: entry = HashCacheEntry( file_path=full_path, @@ -793,9 +808,9 @@ def _process_input_path( file_hash=hash_file(full_path, hash_alg), last_modified_time=actual_modified_time, ) - is_new_or_modified = True + file_status = FileStatus.NEW - if is_new_or_modified: + if file_status != FileStatus.UNCHANGED and update: hash_cache.put_entry(entry) file_size = path.resolve().stat().st_size @@ -809,7 +824,7 @@ def _process_input_path( path_args["mtime"] = trunc(path.stat().st_mtime_ns // 1000) path_args["size"] = file_size - return (is_new_or_modified, file_size, manifest_model.Path(**path_args)) + return (file_status, file_size, manifest_model.Path(**path_args)) def _create_manifest_file( self, @@ -834,10 +849,10 @@ def _create_manifest_file( for path in input_paths } for future in concurrent.futures.as_completed(futures): - (is_hashed, file_size, path_to_put_in_manifest) = future.result() + (file_status, file_size, path_to_put_in_manifest) = future.result() paths.append(path_to_put_in_manifest) if progress_tracker: - if is_hashed: + if file_status == FileStatus.NEW or file_status == FileStatus.MODIFIED: progress_tracker.increase_processed(1, file_size) else: progress_tracker.increase_skipped(1, file_size) diff --git a/test/unit/deadline_client/cli/test_cli_asset.py b/test/unit/deadline_client/cli/test_cli_asset.py index 9f4e0626..e7d53ca8 100644 --- a/test/unit/deadline_client/cli/test_cli_asset.py +++ b/test/unit/deadline_client/cli/test_cli_asset.py @@ -3,11 +3,21 @@ import pytest from unittest.mock import patch, Mock from click.testing import CliRunner +import os from deadline.client.cli import main +from deadline.client.cli._groups import asset_group from deadline.client import api -from deadline.job_attachments.upload import S3AssetManager -from deadline.job_attachments.models import AssetRootGroup +from deadline.client.api import _submit_job_bundle +from deadline.job_attachments.models import AssetRootGroup, JobAttachmentS3Settings, Attachments +from deadline.job_attachments.upload import S3AssetManager, S3AssetUploader +from deadline.job_attachments.asset_manifests.v2023_03_03 import AssetManifest +from deadline.job_attachments.asset_manifests.hash_algorithms import HashAlgorithm + +from ..api.test_job_bundle_submission import ( + MOCK_FARM_ID, + MOCK_QUEUE_ID, +) @pytest.fixture @@ -23,7 +33,7 @@ def mock_hash_attachments(): @pytest.fixture -def asset_group_mock(tmp_path): +def basic_asset_group(tmp_path): root_dir = str(tmp_path) return AssetRootGroup( root_path=root_dir, @@ -34,18 +44,57 @@ def asset_group_mock(tmp_path): @pytest.fixture -def upload_group_mock(asset_group_mock): +def mock_upload_group(basic_asset_group): return Mock( - asset_groups=[asset_group_mock], + asset_groups=[basic_asset_group], total_input_files=1, total_input_bytes=100, ) +@pytest.fixture +def basic_asset_manifest(): + return AssetManifest(paths=[], hash_alg=HashAlgorithm("xxh128"), total_size=0) + + +@pytest.fixture +def mock_attachment_settings(): + return Attachments(manifests=[], fileSystem="").to_dict + + +@pytest.fixture +def mock_init_objects(): + with patch.object(S3AssetManager, "__init__", lambda self, *args, **kwargs: None), patch.object( + S3AssetUploader, "__init__", lambda self, *args, **kwargs: None + ), patch.object(JobAttachmentS3Settings, "__init__", lambda self, *args, **kwargs: None): + yield + + +@pytest.fixture +def mock_update_manifest(basic_asset_manifest): + with patch.object(asset_group, "update_manifest", return_value=basic_asset_manifest) as mock: + yield mock + + +@pytest.fixture +def mock_upload_attachments(): + with patch.object( + _submit_job_bundle.api, "upload_attachments", return_value=MOCK_UPLOAD_ATTACHMENTS_RESPONSE + ) as mock: + yield mock + + +MOCK_ROOT_DIR = "/path/to/root" +MOCK_MANIFEST_DIR = "/path/to/manifest" +MOCK_MANIFEST_FILE = os.path.join(MOCK_MANIFEST_DIR, "manifest_input") +MOCK_INVALID_DIR = "/nopath/" +MOCK_UPLOAD_ATTACHMENTS_RESPONSE = {"manifests": [{"inputManifestPath": "s3://mock/manifest.json"}]} + + class TestSnapshot: def test_snapshot_root_directory_only( - self, tmp_path, mock_prepare_paths_for_upload, mock_hash_attachments, upload_group_mock + self, tmp_path, mock_prepare_paths_for_upload, mock_hash_attachments, mock_upload_group ): """ Tests if CLI snapshot command calls correctly with an exiting directory path at --root-dir @@ -55,7 +104,7 @@ def test_snapshot_root_directory_only( temp_file = tmp_path / "temp_file.txt" temp_file.touch() - mock_prepare_paths_for_upload.return_value = upload_group_mock + mock_prepare_paths_for_upload.return_value = mock_upload_group runner = CliRunner() result = runner.invoke(main, ["asset", "snapshot", "--root-dir", root_dir]) @@ -79,7 +128,7 @@ def test_invalid_root_directory(self, tmp_path): assert f"Specified root directory {invalid_root_dir} does not exist. " in result.output def test_valid_manifest_out( - self, tmp_path, mock_prepare_paths_for_upload, mock_hash_attachments, upload_group_mock + self, tmp_path, mock_prepare_paths_for_upload, mock_hash_attachments, mock_upload_group ): """ Tests if CLI snapshot command correctly calls with --manifest-out arguement @@ -91,7 +140,7 @@ def test_valid_manifest_out( temp_file = tmp_path / "temp_file.txt" temp_file.touch() - mock_prepare_paths_for_upload.return_value = upload_group_mock + mock_prepare_paths_for_upload.return_value = mock_upload_group runner = CliRunner() result = runner.invoke( @@ -132,7 +181,7 @@ def test_invalid_manifest_out(self, tmp_path): ) def test_asset_snapshot_recursive( - self, tmp_path, mock_prepare_paths_for_upload, mock_hash_attachments, upload_group_mock + self, tmp_path, mock_prepare_paths_for_upload, mock_hash_attachments, mock_upload_group ): """ Tests if CLI snapshot --recursive flag is called correctly @@ -146,7 +195,7 @@ def test_asset_snapshot_recursive( (subdir2 / "file2.txt").touch() expected_inputs = {str(subdir2 / "file2.txt"), str(subdir1 / "file1.txt")} - mock_prepare_paths_for_upload.return_value = upload_group_mock + mock_prepare_paths_for_upload.return_value = mock_upload_group runner = CliRunner() result = runner.invoke(main, ["asset", "snapshot", "--root-dir", root_dir, "--recursive"]) @@ -155,3 +204,216 @@ def test_asset_snapshot_recursive( actual_inputs = set(mock_prepare_paths_for_upload.call_args[1]["input_paths"]) assert actual_inputs == expected_inputs mock_hash_attachments.assert_called_once() + + +class TestUpload: + + def test_upload_valid_manifest( + fresh_deadline_config, mock_init_objects, mock_upload_attachments + ): + """ + Test the asset upload command with correct arguments and valid manifest path. + """ + with patch.object(_submit_job_bundle.api, "get_boto3_client"), patch.object( + _submit_job_bundle.api, "get_queue_user_boto3_session" + ), patch.object(os.path, "isdir", side_effect=[True, True]), patch.object( + os, "listdir", return_value=["manifest_input"] + ), patch.object( + asset_group, "read_local_manifest", return_value=basic_asset_manifest + ), patch.object( + asset_group, "diff_manifest", return_value=[] + ), patch.object( + S3AssetUploader, "_write_local_manifest_s3_mapping" + ) as mock_write_manifest_mapping: + + runner = CliRunner() + result = runner.invoke( + main, + [ + "asset", + "upload", + "--root-dir", + MOCK_ROOT_DIR, + "--manifest-dir", + MOCK_MANIFEST_DIR, + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + ], + ) + + full_manifest_key = MOCK_UPLOAD_ATTACHMENTS_RESPONSE["manifests"][0]["inputManifestPath"] + manifest_name = os.path.basename(full_manifest_key) + manifest_dir_name = os.path.basename(MOCK_MANIFEST_DIR) + + mock_write_manifest_mapping.assert_called_once_with( + manifest_write_dir=MOCK_ROOT_DIR, + manifest_name=manifest_name, + full_manifest_key=full_manifest_key, + manifest_dir_name=manifest_dir_name, + ) + mock_upload_attachments.assert_called_once() + assert result.exit_code == 0 + + def test_upload_invalid_manifest_dir(fresh_deadline_config): + """ + Test the asset upload command when the manifest directory is not a valid directory. + """ + with patch("deadline.client.cli._groups.asset_group.os.path.isdir", return_value=False): + + runner = CliRunner() + result = runner.invoke( + main, + [ + "asset", + "upload", + "--root-dir", + MOCK_ROOT_DIR, + "--manifest-dir", + MOCK_INVALID_DIR, + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + ], + ) + + assert f"Specified manifest directory {MOCK_INVALID_DIR} does not exist. " + assert result.exit_code == 1 + + def test_upload_with_update( + fresh_deadline_config, + mock_init_objects, + mock_update_manifest, + mock_upload_attachments, + basic_asset_manifest, + ): + """ + Test the asset upload command with the --update flag, and manifest has valid updates to find. + """ + + with patch.object(_submit_job_bundle.api, "get_boto3_client"), patch.object( + _submit_job_bundle.api, "get_queue_user_boto3_session" + ), patch.object(os.path, "isdir", side_effect=[True, True]), patch.object( + os, "listdir", return_value=["manifest_input"] + ), patch.object( + asset_group, "read_local_manifest", return_value=basic_asset_manifest + ), patch.object( + asset_group, "diff_manifest", return_value=["/path/to/modified/file.txt"] + ), patch.object( + S3AssetUploader, "_write_local_manifest_s3_mapping" + ) as mock_write_manifest_mapping: + + runner = CliRunner() + result = runner.invoke( + main, + [ + "asset", + "upload", + "--root-dir", + MOCK_ROOT_DIR, + "--manifest-dir", + MOCK_MANIFEST_DIR, + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + "--update", + ], + ) + + full_manifest_key = MOCK_UPLOAD_ATTACHMENTS_RESPONSE["manifests"][0]["inputManifestPath"] + + manifest_name = os.path.basename(full_manifest_key) + manifest_dir_name = os.path.basename(MOCK_MANIFEST_DIR) + mock_write_manifest_mapping.assert_called_once_with( + manifest_write_dir=MOCK_ROOT_DIR, + manifest_name=manifest_name, + full_manifest_key=full_manifest_key, + manifest_dir_name=manifest_dir_name, + ) + + mock_update_manifest.assert_called_once_with( + manifest=MOCK_MANIFEST_DIR, new_or_modified_paths=["/path/to/modified/file.txt"] + ) + mock_upload_attachments.assert_called_once() + assert "Manifest information updated:" in result.output + assert result.exit_code == 0 + + def test_upload_with_modified_files_without_update(fresh_deadline_config, mock_init_objects): + """ + Test the asset upload command when there are modified files, but the --update flag is not provided. + """ + mock_modified_files = ["/path/to/modified/file1.txt", "/path/to/modified/file2.txt"] + + with patch.object(_submit_job_bundle.api, "get_boto3_client"), patch.object( + _submit_job_bundle.api, "get_queue_user_boto3_session" + ), patch.object(os.path, "isdir", side_effect=[True, True]), patch.object( + os, "listdir", return_value=["manifest_input"] + ), patch.object( + asset_group, "read_local_manifest", return_value=basic_asset_manifest + ), patch.object( + asset_group, "diff_manifest", return_value=mock_modified_files + ): + + runner = CliRunner() + result = runner.invoke( + main, + [ + "asset", + "upload", + "--root-dir", + MOCK_ROOT_DIR, + "--manifest-dir", + MOCK_MANIFEST_DIR, + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + ], + ) + + assert ( + f"Manifest contents in {MOCK_MANIFEST_DIR} are outdated; versioning does not match local files in {MOCK_ROOT_DIR}. Please run with --update to fix current files. " + in result.output + ) + assert result.exit_code == 1 + + def test_cli_asset_upload_read_local_manifest_returns_none( + fresh_deadline_config, mock_init_objects + ): + """ + Test the asset upload command when the read_local_manifest function returns None. + """ + + with patch.object(_submit_job_bundle.api, "get_boto3_client"), patch.object( + _submit_job_bundle.api, "get_queue_user_boto3_session" + ), patch.object(os.path, "isdir", side_effect=[True, True]), patch.object( + os, "listdir", return_value=["manifest_input"] + ), patch.object( + asset_group, "read_local_manifest", return_value=None + ): + + runner = CliRunner() + result = runner.invoke( + main, + [ + "asset", + "upload", + "--root-dir", + MOCK_ROOT_DIR, + "--manifest-dir", + MOCK_MANIFEST_DIR, + "--farm-id", + MOCK_FARM_ID, + "--queue-id", + MOCK_QUEUE_ID, + ], + ) + + assert ( + f"Specified manifest directory {MOCK_MANIFEST_DIR} does contain valid manifest input file." + in result.output + ) + assert result.exit_code == 1 diff --git a/test/unit/deadline_client/cli/test_cli_bundle.py b/test/unit/deadline_client/cli/test_cli_bundle.py index b59a14dd..6180c3c5 100644 --- a/test/unit/deadline_client/cli/test_cli_bundle.py +++ b/test/unit/deadline_client/cli/test_cli_bundle.py @@ -122,7 +122,7 @@ def test_cli_bundle_submit(fresh_deadline_config, temp_job_bundle_dir): ), patch.object( _submit_job_bundle.api, "get_queue_user_boto3_session" ), patch.object( - _submit_job_bundle, "_upload_attachments" + _submit_job_bundle, "upload_attachments" ), patch.object( bundle_group.api, "get_deadline_cloud_library_telemetry_client" ): @@ -341,7 +341,7 @@ def test_cli_bundle_asset_load_method(fresh_deadline_config, temp_job_bundle_dir ) as qp_boto3_client_mock, patch.object( _submit_job_bundle, "hash_attachments", return_value=(attachment_mock, {}) ), patch.object( - _submit_job_bundle, "_upload_attachments", return_value={} + _submit_job_bundle, "upload_attachments", return_value={} ), patch.object( _submit_job_bundle.api, "get_boto3_session" ), patch.object( @@ -635,7 +635,7 @@ def test_cli_bundle_accept_upload_confirmation(fresh_deadline_config, temp_job_b ) as get_boto3_client_mock, patch.object( _submit_job_bundle, "hash_attachments", return_value=[SummaryStatistics(), "test"] ), patch.object( - _submit_job_bundle, "_upload_attachments" + _submit_job_bundle, "upload_attachments" ), patch.object( _submit_job_bundle.api, "get_boto3_session" ), patch.object( @@ -713,7 +713,7 @@ def test_cli_bundle_reject_upload_confirmation(fresh_deadline_config, temp_job_b ) as qp_boto3_client_mock, patch.object( _submit_job_bundle, "hash_attachments", return_value=[SummaryStatistics(), "test"] ), patch.object( - _submit_job_bundle, "_upload_attachments" + _submit_job_bundle, "upload_attachments" ) as upload_attachments_mock, patch.object( _submit_job_bundle.api, "get_boto3_session" ), patch.object( diff --git a/test/unit/deadline_job_attachments/test_upload.py b/test/unit/deadline_job_attachments/test_upload.py index ac8d4ac6..457f4f98 100644 --- a/test/unit/deadline_job_attachments/test_upload.py +++ b/test/unit/deadline_job_attachments/test_upload.py @@ -52,7 +52,7 @@ ProgressStatus, SummaryStatistics, ) -from deadline.job_attachments.upload import S3AssetManager, S3AssetUploader +from deadline.job_attachments.upload import FileStatus, S3AssetManager, S3AssetUploader from deadline.job_attachments._utils import _human_readable_file_size from ..conftest import is_windows_non_admin @@ -1605,7 +1605,7 @@ def test_process_input_path_cached_file_is_updated( ) # THEN - assert is_hashed is True + assert is_hashed == FileStatus.NEW or is_hashed == FileStatus.MODIFIED assert man_path.path == "test.txt" assert man_path.hash == "b" hash_cache.put_entry.assert_called_with(expected_entry) @@ -1642,7 +1642,7 @@ def test_process_input_path_skip_file_already_in_hash_cache(self, farm_id, queue ) # THEN - assert is_hashed is False + assert is_hashed == FileStatus.UNCHANGED assert size == file_bytes assert man_path.path == "test.txt" assert man_path.hash == "a"