From b5b203bf5b370cb5e51b3e60f6b052e87e11d6d7 Mon Sep 17 00:00:00 2001 From: David Leong <116610336+leongdl@users.noreply.github.com> Date: Fri, 11 Oct 2024 12:25:19 -0700 Subject: [PATCH] feat(JA): Add manifest upload and download to complete the JA standalone API+CLI featureset. Signed-off-by: David Leong <116610336+leongdl@users.noreply.github.com> --- .../client/cli/_groups/manifest_group.py | 115 +++++++- src/deadline/job_attachments/api/manifest.py | 205 ++++++++++++- src/deadline/job_attachments/models.py | 5 +- test/integ/cli/__init__.py | 1 + test/integ/cli/conftest.py | 171 +++++++++++ test/integ/cli/test_cli_manifest.py | 156 ---------- test/integ/cli/test_cli_manifest_download.py | 275 ++++++++++++++++++ test/integ/cli/test_cli_manifest_upload.py | 140 +++++++++ test/integ/cli/test_data/inputs/scene.ma | 1 + .../cli/test_data/inputs/textures/brick.png | 1 + .../cli/test_data/inputs/textures/cloth.png | 1 + .../outputs/not_for_sync_outputs.txt | 1 + test/integ/cli/test_utils.py | 64 ++++ test/integ/conftest.py | 142 +++++++++ .../deadline_job_attachments/conftest.py | 89 ------ .../api/test_manifest_download.py | 80 +++++ .../api/test_manifest_upload.py | 89 ++++++ 17 files changed, 1276 insertions(+), 260 deletions(-) create mode 100644 test/integ/cli/__init__.py create mode 100644 test/integ/cli/conftest.py delete mode 100644 test/integ/cli/test_cli_manifest.py create mode 100644 test/integ/cli/test_cli_manifest_download.py create mode 100644 test/integ/cli/test_cli_manifest_upload.py create mode 100644 test/integ/cli/test_data/inputs/scene.ma create mode 100644 test/integ/cli/test_data/inputs/textures/brick.png create mode 100644 test/integ/cli/test_data/inputs/textures/cloth.png create mode 100644 test/integ/cli/test_data/outputs/not_for_sync_outputs.txt create mode 100644 test/integ/cli/test_utils.py create mode 100644 test/unit/deadline_job_attachments/api/test_manifest_download.py create mode 100644 test/unit/deadline_job_attachments/api/test_manifest_upload.py diff --git a/src/deadline/client/cli/_groups/manifest_group.py b/src/deadline/client/cli/_groups/manifest_group.py index 4e461e2a..f6435fad 100644 --- a/src/deadline/client/cli/_groups/manifest_group.py +++ b/src/deadline/client/cli/_groups/manifest_group.py @@ -9,21 +9,31 @@ """ from __future__ import annotations +from configparser import ConfigParser import dataclasses import os -from typing import List +from typing import List, Optional +import boto3 import click +from deadline.client import api +from deadline.client.config import config_file from deadline.job_attachments._diff import pretty_print_cli from deadline.job_attachments.api.manifest import ( _glob_files, _manifest_diff, + _manifest_download, _manifest_snapshot, + _manifest_upload, +) +from deadline.job_attachments.models import ( + S3_MANIFEST_FOLDER_NAME, + JobAttachmentS3Settings, + ManifestDiff, ) -from deadline.job_attachments.models import ManifestDiff from ...exceptions import NonValidInputError -from .._common import _handle_error +from .._common import _apply_cli_options_to_config, _handle_error from .click_logger import ClickLogger @@ -228,28 +238,113 @@ def manifest_download( """ Downloads input manifest of previously submitted job. """ - raise NotImplementedError("This CLI is being implemented.") + logger: ClickLogger = ClickLogger(is_json=json) + if not os.path.isdir(download_dir): + raise NonValidInputError(f"Specified destination directory {download_dir} does not exist. ") + + # setup config + config: Optional[ConfigParser] = _apply_cli_options_to_config( + required_options={"farm_id", "queue_id"}, **args + ) + queue_id: str = config_file.get_setting("defaults.queue_id", config=config) + farm_id: str = config_file.get_setting("defaults.farm_id", config=config) + + boto3_session: boto3.Session = api.get_boto3_session(config=config) + + output = _manifest_download( + download_dir=download_dir, + farm_id=farm_id, + queue_id=queue_id, + job_id=job_id, + step_id=step_id, + boto3_session=boto3_session, + logger=logger, + ) + logger.json(dataclasses.asdict(output)) @cli_manifest.command( name="upload", - help="BETA - Uploads a job attachment manifest file to a Content Addressable Storage's Manifest store. If calling via --s3-cas-path, it is recommended to use with --profile for a specific AWS profile with CAS S3 bucket access.", + help="BETA - Uploads a job attachment manifest file to a Content Addressable Storage's Manifest store. If calling via --s3-cas-path, it is recommended to use with --profile for a specific AWS profile with CAS S3 bucket access. Check exit code for success or failure.", ) @click.argument("manifest_file") @click.option("--profile", help="The AWS profile to use.") -@click.option("--s3-cas-path", help="The path to the Content Addressable Storage root.") +@click.option("--s3-cas-uri", help="The URI to the Content Addressable Storage S3 bucket and root.") +@click.option( + "--s3-manifest-prefix", help="Prefix subpath in the manifest folder to upload the manifest." +) @click.option( - "--farm-id", help="The AWS Deadline Cloud Farm to use. Alternative to using --s3-cas-path." + "--farm-id", help="The AWS Deadline Cloud Farm to use. Alternative to using --s3-cas-uri." ) @click.option( - "--queue-id", help="The AWS Deadline Cloud Queue to use. Alternative to using --s3-cas-path." + "--queue-id", help="The AWS Deadline Cloud Queue to use. Alternative to using --s3-cas-uri." ) @click.option("--json", default=None, is_flag=True, help="Output is printed as JSON for scripting.") @_handle_error def manifest_upload( manifest_file: str, - s3_cas_path: str, + s3_cas_uri: str, + s3_manifest_prefix: str, json: bool, **args, ): - raise NotImplementedError("This CLI is being implemented.") + # Input checking. + if not manifest_file or not os.path.isfile(manifest_file): + raise NonValidInputError(f"Specified manifest {manifest_file} does not exist. ") + + # Where will we upload the manifest to? + required: set[str] = set() + if not s3_cas_uri: + required = {"farm_id", "queue_id"} + + config: Optional[ConfigParser] = _apply_cli_options_to_config(required_options=required, **args) + + # Logger + logger: ClickLogger = ClickLogger(is_json=json) + + bucket_name: str = "" + session: boto3.Session = api.get_boto3_session(config=config) + if not s3_cas_uri: + farm_id = config_file.get_setting("defaults.farm_id", config=config) + queue_id = config_file.get_setting("defaults.queue_id", config=config) + + deadline = api.get_boto3_client("deadline", config=config) + queue = deadline.get_queue( + farmId=farm_id, + queueId=queue_id, + ) + queue_ja_settings: JobAttachmentS3Settings = JobAttachmentS3Settings( + **queue["jobAttachmentSettings"] + ) + bucket_name = queue_ja_settings.s3BucketName + cas_path = queue_ja_settings.rootPrefix + + # IF we supplied a farm and queue, use the queue credentials. + session = api.get_queue_user_boto3_session( + deadline=deadline, + config=config, + farm_id=farm_id, + queue_id=queue_id, + queue_display_name=queue["displayName"], + ) + + else: + # Self supplied cas path. + uri_ja_settings: JobAttachmentS3Settings = JobAttachmentS3Settings.from_s3_root_uri( + s3_cas_uri + ) + bucket_name = uri_ja_settings.s3BucketName + cas_path = uri_ja_settings.rootPrefix + + logger.echo( + f"Uploading Manifest to {bucket_name} {cas_path} {S3_MANIFEST_FOLDER_NAME}, prefix: {s3_manifest_prefix}" + ) + _manifest_upload( + manifest_file=manifest_file, + s3_bucket_name=bucket_name, + s3_cas_prefix=cas_path, + s3_key_prefix=s3_manifest_prefix, + boto_session=session, + logger=logger, + ) + logger.echo("Uploading successful!") diff --git a/src/deadline/job_attachments/api/manifest.py b/src/deadline/job_attachments/api/manifest.py index 614ad922..8c3e1b07 100644 --- a/src/deadline/job_attachments/api/manifest.py +++ b/src/deadline/job_attachments/api/manifest.py @@ -1,10 +1,14 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. import datetime +from io import BytesIO import os from pathlib import Path -from typing import List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple +import boto3 + +from deadline.client.api._session import _get_queue_user_boto3_session, get_default_client_config from deadline.client.cli._groups.click_logger import ClickLogger from deadline.job_attachments._diff import _fast_file_list_to_manifest_diff, compare_manifest from deadline.job_attachments._glob import _process_glob_inputs, _glob_paths @@ -19,14 +23,23 @@ from deadline.job_attachments.asset_manifests.decode import decode_manifest from deadline.job_attachments.asset_manifests.hash_algorithms import hash_data from deadline.job_attachments.caches.hash_cache import HashCache +from deadline.job_attachments.download import ( + get_manifest_from_s3, + get_output_manifests_by_asset_root, + merge_asset_manifests, +) from deadline.job_attachments.models import ( + S3_MANIFEST_FOLDER_NAME, FileStatus, GlobConfig, + JobAttachmentS3Settings, ManifestDiff, + ManifestDownload, + ManifestDownloadResponse, ManifestSnapshot, default_glob_all, ) -from deadline.job_attachments.upload import S3AssetManager +from deadline.job_attachments.upload import S3AssetManager, S3AssetUploader """ APIs here should be business logic only. It should perform one thing, and one thing well. @@ -247,3 +260,191 @@ def process_output(status: FileStatus, path: str, output_diff: ManifestDiff): process_output(fast_diff_item[1], fast_diff_item[0], output) return output + + +def _manifest_upload( + manifest_file: str, + s3_bucket_name: str, + s3_cas_prefix: str, + boto_session: boto3.Session, + s3_key_prefix: Optional[str] = None, + logger: ClickLogger = ClickLogger(False), +): + """ + BETA API - This API is still evolving but will be made public in the near future. + API to upload a job attachment manifest to the Content Addressable Storage. Manifests will be + uploaded to s3://{s3_bucket_name}/{cas_prefix}/Manifests/{s3_key_prefix}/{manifest_file_name} as per the Deadline CAS folder structure. + manifest_file: File Path to the manifest file for upload. + s3_bucket_name: S3 bucket name. + boto_session: S3 Content Addressable Storage prefix. + s3_key_prefix: [Optional] S3 prefix path to the Content Addressable Storge. + boto_session: Boto3 session. + logger: Click Logger instance to print to CLI as test or JSON. + """ + # S3 metadata + + # Upload settings: + s3_metadata: Dict[str, Any] = {"Metadata": {}} + s3_metadata["Metadata"]["file-system-location-name"] = manifest_file + + # Always upload the manifest file to case root /Manifest with the original file name. + manifest_path: str = "/".join( + [s3_cas_prefix, S3_MANIFEST_FOLDER_NAME, s3_key_prefix, Path(manifest_file).name] + if s3_key_prefix + else [s3_cas_prefix, S3_MANIFEST_FOLDER_NAME, Path(manifest_file).name] + ) + + # S3 uploader. + upload = S3AssetUploader(session=boto_session) + with open(manifest_file) as manifest: + upload.upload_bytes_to_s3( + bytes=BytesIO(manifest.read().encode("utf-8")), + bucket=s3_bucket_name, + key=manifest_path, + progress_handler=logger.echo, + extra_args=s3_metadata, + ) + + +def _manifest_download( + download_dir: str, + farm_id: str, + queue_id: str, + job_id: str, + boto3_session: boto3.Session, + step_id: Optional[str] = None, + logger: ClickLogger = ClickLogger(False), +) -> ManifestDownloadResponse: + """ + BETA API - This API is still evolving but will be made public in the near future. + API to download the Job Attachment manifest for a Job, and optionally dependencies for Step. + download_dir: Download directory. + farm_id: The Deadline Farm to download from. + queue_id: The Deadline Queue to download from. + job_id: Job Id to download. + boto_session: Boto3 session. + step_id: Optional[str]: Optional, download manifest for a step + logger: Click Logger instance to print to CLI as test or JSON. + return ManifestDownloadResponse Downloaded Manifest data. Contains source S3 key and local download path. + """ + + # Deadline Client and get the Queue to download. + deadline = boto3_session.client("deadline", config=get_default_client_config()) + + queue: dict = deadline.get_queue( + farmId=farm_id, + queueId=queue_id, + ) + + # assume queue role - session permissions + queue_role_session: boto3.Session = _get_queue_user_boto3_session( + deadline=deadline, + base_session=boto3_session, + farm_id=farm_id, + queue_id=queue_id, + queue_display_name=queue["displayName"], + ) + + # Queue's Job Attachment settings. + queue_s3_settings = JobAttachmentS3Settings(**queue["jobAttachmentSettings"]) + + # Get S3 prefix + s3_prefix: Path = Path(queue_s3_settings.rootPrefix, S3_MANIFEST_FOLDER_NAME) + + # Capture a list of success download files for JSON output. + successful_downloads: List[ManifestDownload] = [] + + # Utility function to build up manifests by root. + manifests_by_root: Dict[str, List[BaseAssetManifest]] = dict() + + def add_manifest_by_root( + manifests_by_root: Dict[str, list], root: str, manifest: BaseAssetManifest + ): + if root not in manifests_by_root: + manifests_by_root[root] = [] + manifests_by_root[root].append(manifest) + + # Get input_manifest_paths from Deadline GetJob API + job: dict = deadline.get_job(farmId=farm_id, queueId=queue_id, jobId=job_id) + attachments: dict = job["attachments"] if "attachments" in job else {} + input_manifest_paths: List[Tuple[str, str]] = [ + (manifest.get("inputManifestPath", ""), manifest["rootPath"]) + for manifest in attachments["manifests"] + ] + + # Download each input_manifest_path + for input_manifest_path, root_path in input_manifest_paths: + asset_manifest: BaseAssetManifest = get_manifest_from_s3( + manifest_key=(s3_prefix / input_manifest_path).as_posix(), + s3_bucket=queue_s3_settings.s3BucketName, + session=queue_role_session, + ) + if asset_manifest is not None: + logger.echo(f"Downloaded input manifest for root: {root_path}") + add_manifest_by_root( + manifests_by_root=manifests_by_root, root=root_path, manifest=asset_manifest + ) + + # Now handle step-step dependencies + if step_id is not None: + # Get Step-Step dependencies. + nextToken = "" + step_dep_response = deadline.list_step_dependencies( + farmId=farm_id, + queueId=queue_id, + jobId=job_id, + stepId=step_id, + nextToken=nextToken, + ) + for dependent_step in step_dep_response["dependencies"]: + logger.echo(f"Found Step-Step dependency. {dependent_step['stepId']}") + + # Get manifests for the step-step dependency + step_manifests_by_root: Dict[str, List[BaseAssetManifest]] = ( + get_output_manifests_by_asset_root( + s3_settings=queue_s3_settings, + farm_id=farm_id, + queue_id=queue_id, + job_id=job_id, + step_id=dependent_step["stepId"], + session=queue_role_session, + ) + ) + # Merge all manifests by root. + for root in step_manifests_by_root.keys(): + for manifest in step_manifests_by_root[root]: + logger.echo(f"Found step-step output manifest for root: {root}") + add_manifest_by_root( + manifests_by_root=manifests_by_root, root=root, manifest=manifest + ) + + # Finally, merge all manifest paths to create unified manifests. + # TODO: Filter outputs by path + merged_manifests: Dict[str, BaseAssetManifest] = {} + for root in manifests_by_root.keys(): + merged_manifest = merge_asset_manifests(manifests_by_root[root]) + if merged_manifest: + merged_manifests[root] = merged_manifest + + # Save the manifest files to disk. + for root in merged_manifests.keys(): + # Save the merged manifest as {root}_{hash}_timestamp. + root_hash: str = hash_data( + root.encode("utf-8"), merged_manifests[root].get_default_hash_alg() + ) + timestamp = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") + manifest_name = root.replace("/", "_") + manifest_name = manifest_name[1:] if manifest_name[0] == "_" else manifest_name + manifest_name = f"{manifest_name}-{root_hash}-{timestamp}.manifest" + + local_manifest_file_path = os.path.join(download_dir, manifest_name) + with open(local_manifest_file_path, "w") as file: + file.write(merged_manifests[root].encode()) + successful_downloads.append( + ManifestDownload(manifest_root=root, local_manifest_path=str(local_manifest_file_path)) + ) + logger.echo(f"Downloaded merged manifest for root: {root} to: {local_manifest_file_path}") + + # JSON output at the end. + output = ManifestDownloadResponse(downloaded=successful_downloads) + return output diff --git a/src/deadline/job_attachments/models.py b/src/deadline/job_attachments/models.py index ba94c137..48a22be7 100644 --- a/src/deadline/job_attachments/models.py +++ b/src/deadline/job_attachments/models.py @@ -399,8 +399,8 @@ class ManifestDiff: class ManifestDownload: """Data structure to store the S3 and local paths of a manifest""" - s3_key: str = field(default_factory=str) - local: str = field(default_factory=str) + manifest_root: str = field(default_factory=str) + local_manifest_path: str = field(default_factory=str) @dataclass @@ -408,7 +408,6 @@ class ManifestDownloadResponse: """Data structure to capture the response for manifest download""" downloaded: list[ManifestDownload] = field(default_factory=list) - failed: list[str] = field(default_factory=list) @dataclass diff --git a/test/integ/cli/__init__.py b/test/integ/cli/__init__.py new file mode 100644 index 00000000..8d929cc8 --- /dev/null +++ b/test/integ/cli/__init__.py @@ -0,0 +1 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. diff --git a/test/integ/cli/conftest.py b/test/integ/cli/conftest.py new file mode 100644 index 00000000..662af5a7 --- /dev/null +++ b/test/integ/cli/conftest.py @@ -0,0 +1,171 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +from unittest.mock import MagicMock +import pytest +from deadline.job_attachments import upload +from deadline.job_attachments._aws.deadline import get_queue +from deadline.job_attachments.asset_manifests.hash_algorithms import HashAlgorithm, hash_file +from deadline.job_attachments.asset_manifests.versions import ManifestVersion +from .test_utils import JobAttachmentTest, UploadInputFilesOneAssetInCasOutputs + + +@pytest.fixture(scope="session") +def job_attachment_test( + tmp_path_factory: pytest.TempPathFactory, + request: pytest.FixtureRequest, +): + """ + Fixture to get the session's JobAttachmentTest object. + """ + + return JobAttachmentTest(tmp_path_factory, manifest_version=ManifestVersion.v2023_03_03) + + +@pytest.fixture(scope="session") +def upload_input_files_assets_not_in_cas(job_attachment_test: JobAttachmentTest): + """ + When no assets are in the CAS, make sure all files are uploaded. + """ + # IF + + job_attachment_settings = get_queue( + farm_id=job_attachment_test.farm_id, + queue_id=job_attachment_test.queue_id, + deadline_endpoint_url=job_attachment_test.deadline_endpoint, + ).jobAttachmentSettings + + if job_attachment_settings is None: + raise TypeError("Job attachment settings must be set for this test.") + + asset_manager = upload.S3AssetManager( + farm_id=job_attachment_test.farm_id, + queue_id=job_attachment_test.queue_id, + job_attachment_settings=job_attachment_settings, + asset_manifest_version=job_attachment_test.manifest_version, + ) + + mock_on_preparing_to_submit = MagicMock(return_value=True) + mock_on_uploading_files = MagicMock(return_value=True) + + # WHEN + upload_group = asset_manager.prepare_paths_for_upload( + input_paths=[str(job_attachment_test.SCENE_MA_PATH)], + output_paths=[str(job_attachment_test.OUTPUT_PATH)], + referenced_paths=[], + ) + (_, manifests) = asset_manager.hash_assets_and_create_manifest( + asset_groups=upload_group.asset_groups, + total_input_files=upload_group.total_input_files, + total_input_bytes=upload_group.total_input_bytes, + hash_cache_dir=str(job_attachment_test.hash_cache_dir), + on_preparing_to_submit=mock_on_preparing_to_submit, + ) + asset_manager.upload_assets( + manifests, + on_uploading_assets=mock_on_uploading_files, + s3_check_cache_dir=str(job_attachment_test.s3_cache_dir), + ) + + # THEN + scene_ma_s3_path = ( + f"{job_attachment_settings.full_cas_prefix()}/{job_attachment_test.SCENE_MA_HASH}.xxh128" + ) + + object_summary_iterator = job_attachment_test.bucket.objects.filter( + Prefix=scene_ma_s3_path, + ) + + assert list(object_summary_iterator)[0].key == scene_ma_s3_path + + +@pytest.fixture(scope="session") +def upload_input_files_one_asset_in_cas( + job_attachment_test: JobAttachmentTest, upload_input_files_assets_not_in_cas: None +) -> UploadInputFilesOneAssetInCasOutputs: + """ + Test that when one asset is already in the CAS, that every file except for the one in the CAS is uploaded. + """ + # IF + job_attachment_settings = get_queue( + farm_id=job_attachment_test.farm_id, + queue_id=job_attachment_test.queue_id, + deadline_endpoint_url=job_attachment_test.deadline_endpoint, + ).jobAttachmentSettings + + if job_attachment_settings is None: + raise Exception("Job attachment settings must be set for this test.") + + asset_manager = upload.S3AssetManager( + farm_id=job_attachment_test.farm_id, + queue_id=job_attachment_test.queue_id, + job_attachment_settings=job_attachment_settings, + asset_manifest_version=job_attachment_test.manifest_version, + ) + + input_paths = [ + str(job_attachment_test.SCENE_MA_PATH), + str(job_attachment_test.BRICK_PNG_PATH), + str(job_attachment_test.CLOTH_PNG_PATH), + str(job_attachment_test.INPUT_IN_OUTPUT_DIR_PATH), + ] + + scene_ma_s3_path = ( + f"{job_attachment_settings.full_cas_prefix()}/{job_attachment_test.SCENE_MA_HASH}.xxh128" + ) + + # This file has already been uploaded + scene_ma_upload_time = job_attachment_test.bucket.Object(scene_ma_s3_path).last_modified + + mock_on_preparing_to_submit = MagicMock(return_value=True) + mock_on_uploading_files = MagicMock(return_value=True) + + # WHEN + upload_group = asset_manager.prepare_paths_for_upload( + input_paths=input_paths, + output_paths=[str(job_attachment_test.OUTPUT_PATH)], + referenced_paths=[], + ) + (_, manifests) = asset_manager.hash_assets_and_create_manifest( + asset_groups=upload_group.asset_groups, + total_input_files=upload_group.total_input_files, + total_input_bytes=upload_group.total_input_bytes, + hash_cache_dir=str(job_attachment_test.hash_cache_dir), + on_preparing_to_submit=mock_on_preparing_to_submit, + ) + + (_, attachments) = asset_manager.upload_assets( + manifests, + on_uploading_assets=mock_on_uploading_files, + s3_check_cache_dir=str(job_attachment_test.s3_cache_dir), + ) + + # THEN + brick_png_hash = hash_file(str(job_attachment_test.BRICK_PNG_PATH), HashAlgorithm.XXH128) + cloth_png_hash = hash_file(str(job_attachment_test.CLOTH_PNG_PATH), HashAlgorithm.XXH128) + input_in_output_dir_hash = hash_file( + str(job_attachment_test.INPUT_IN_OUTPUT_DIR_PATH), HashAlgorithm.XXH128 + ) + + brick_png_s3_path = f"{job_attachment_settings.full_cas_prefix()}/{brick_png_hash}.xxh128" + cloth_png_s3_path = f"{job_attachment_settings.full_cas_prefix()}/{cloth_png_hash}.xxh128" + input_in_output_dir_s3_path = ( + f"{job_attachment_settings.full_cas_prefix()}/{input_in_output_dir_hash}.xxh128" + ) + + object_summary_iterator = job_attachment_test.bucket.objects.filter( + Prefix=f"{job_attachment_settings.full_cas_prefix()}/", + ) + + s3_objects = {obj.key: obj for obj in object_summary_iterator} + + assert {brick_png_s3_path, cloth_png_s3_path, input_in_output_dir_s3_path} <= set( + map(lambda x: x.key, object_summary_iterator) + ) + + assert brick_png_s3_path in s3_objects + assert cloth_png_s3_path in s3_objects + assert input_in_output_dir_s3_path in s3_objects + # Make sure that the file hasn't been modified/reuploaded + assert s3_objects[scene_ma_s3_path].last_modified == scene_ma_upload_time + + return UploadInputFilesOneAssetInCasOutputs(attachments) diff --git a/test/integ/cli/test_cli_manifest.py b/test/integ/cli/test_cli_manifest.py deleted file mode 100644 index bfd31a68..00000000 --- a/test/integ/cli/test_cli_manifest.py +++ /dev/null @@ -1,156 +0,0 @@ -# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - -""" -Integ tests for the CLI asset commands. -""" -import os -import json -from click.testing import CliRunner -import pytest -import tempfile -from deadline.job_attachments.asset_manifests.hash_algorithms import hash_file, HashAlgorithm - -from deadline.client.cli import main - - -TEST_FILE_CONTENT = "test file content" -TEST_SUB_DIR_FILE_CONTENT = "subdir file content" -TEST_ROOT_DIR_FILE_CONTENT = "root file content" - -TEST_ROOT_FILE = "root_file.txt" -TEST_SUB_FILE = "subdir_file.txt" - -TEST_ROOT_DIR = "root_dir" -TEST_MANIFEST_DIR = "manifest_dir" -TEST_SUB_DIR_1 = "subdir1" -TEST_SUB_DIR_2 = "subdir2" - - -class TestManifestSnapshot: - - @pytest.fixture - def temp_dir(self): - with tempfile.TemporaryDirectory() as tmpdir_path: - yield tmpdir_path - - def test_snapshot_basic(self, temp_dir): - """ - Snapshot with a valid root directory containing one file, and no other parameters. Basic test the CLI calls into the API. - Deeper testing is done at the API layer. - """ - # Given - root_dir = os.path.join(temp_dir, TEST_ROOT_DIR) - os.makedirs(root_dir) - manifest_dir = os.path.join(temp_dir, TEST_MANIFEST_DIR) - os.makedirs(manifest_dir) - file_path = os.path.join(root_dir, TEST_ROOT_FILE) - with open(file_path, "w") as f: - f.write(TEST_FILE_CONTENT) - - # When - runner = CliRunner() - result = runner.invoke( - main, - [ - "manifest", - "snapshot", - "--root", - root_dir, - "--destination", - manifest_dir, - "--name", - "test", - ], - ) - assert result.exit_code == 0, f"Non-Zeo exit code, CLI output {result.output}" - - # Then - # Check manifest file details to match correct content - manifest_files = os.listdir(manifest_dir) - assert ( - len(manifest_files) == 1 - ), f"Expected exactly one manifest file, but got {len(manifest_files)}" - manifest_file_name = manifest_files[0] - assert ( - "test" in manifest_file_name - ), f"Expected test in manifest file name, got {manifest_file_name}" - - manifest_file_path = os.path.join(manifest_dir, manifest_file_name) - - with open(manifest_file_path, "r") as f: - manifest_data = json.load(f) - - expected_hash = hash_file(file_path, HashAlgorithm("xxh128")) # hashed with xxh128 - manifest_data_paths = manifest_data["paths"] - assert ( - len(manifest_data_paths) == 1 - ), f"Expected exactly one path inside manifest, but got {len(manifest_data_paths)}" - assert manifest_data_paths[0]["path"] == TEST_ROOT_FILE - assert manifest_data_paths[0]["hash"] == expected_hash - - def test_snapshot_json(self, temp_dir): - """ - Snapshot with a valid root directory containing one file, and no other parameters. Basic test the CLI calls into the API. - Deeper testing is done at the API layer. - """ - - TEST_FILE_CONTENT = "test file content" - TEST_ROOT_FILE = "root_file.txt" - TEST_ROOT_DIR = "root_dir" - TEST_MANIFEST_DIR = "manifest_dir" - - # Given - root_dir = os.path.join(temp_dir, TEST_ROOT_DIR) - os.makedirs(root_dir) - manifest_dir = os.path.join(temp_dir, TEST_MANIFEST_DIR) - os.makedirs(manifest_dir) - file_path = os.path.join(root_dir, TEST_ROOT_FILE) - with open(file_path, "w") as f: - f.write(TEST_FILE_CONTENT) - - # When - runner = CliRunner() - result = runner.invoke( - main, - [ - "manifest", - "snapshot", - "--root", - root_dir, - "--destination", - manifest_dir, - "--name", - "test", - "--json", - ], - ) - assert result.exit_code == 0, f"Non-Zeo exit code, CLI output {result.output}" - - # Then - # Check manifest file details to match correct content - manifest_files = os.listdir(manifest_dir) - assert ( - len(manifest_files) == 1 - ), f"Expected exactly one manifest file, but got {len(manifest_files)}" - manifest_file_name = manifest_files[0] - assert ( - "test" in manifest_file_name - ), f"Expected test in manifest file name, got {manifest_file_name}" - - manifest_file_path = os.path.join(manifest_dir, manifest_file_name) - - with open(manifest_file_path, "r") as f: - manifest_data = json.load(f) - - expected_hash = hash_file(file_path, HashAlgorithm("xxh128")) # hashed with xxh128 - manifest_data_paths = manifest_data["paths"] - assert ( - len(manifest_data_paths) == 1 - ), f"Expected exactly one path inside manifest, but got {len(manifest_data_paths)}" - assert manifest_data_paths[0]["path"] == TEST_ROOT_FILE - assert manifest_data_paths[0]["hash"] == expected_hash - - # Since this is JSON, check that we can parse the json - manifest_output = json.loads(result.output) - assert "manifest" in manifest_output["manifest"] - assert manifest_output["manifest"][0] in manifest_file_path diff --git a/test/integ/cli/test_cli_manifest_download.py b/test/integ/cli/test_cli_manifest_download.py new file mode 100644 index 00000000..04355f96 --- /dev/null +++ b/test/integ/cli/test_cli_manifest_download.py @@ -0,0 +1,275 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +""" +Integ tests for the CLI manifest download commands. +""" +import json +import os +import tempfile +import time +from typing import List +import pytest +from click.testing import CliRunner + +from deadline.client.cli import main +from deadline.job_attachments._utils import _float_to_iso_datetime_string +from deadline.job_attachments.asset_manifests.base_manifest import BaseAssetManifest +from deadline.job_attachments.asset_manifests.decode import decode_manifest +from deadline.job_attachments.asset_manifests.hash_algorithms import HashAlgorithm +from deadline.job_attachments.asset_manifests.v2023_03_03.asset_manifest import ( + AssetManifest, + ManifestPath, +) +from deadline.job_attachments.asset_sync import AssetSync +from .test_utils import JobAttachmentTest, UploadInputFilesOneAssetInCasOutputs + + +@pytest.mark.integ +class TestManifestDownload: + + @pytest.fixture + def temp_dir(self): + with tempfile.TemporaryDirectory() as tmpdir_path: + yield tmpdir_path + + def _setup_create_job( + self, + upload_input_files_one_asset_in_cas: UploadInputFilesOneAssetInCasOutputs, + job_template: str, + job_attachment_test: JobAttachmentTest, + ) -> str: + """ + Create a job with the provided template and wait for the job to be created. + """ + farm_id: str = job_attachment_test.farm_id + queue_id: str = job_attachment_test.queue_id + + # Setup failure for the test. + assert farm_id + assert queue_id + + # Create a job w/ CAS data already created. + job_response = job_attachment_test.deadline_client.create_job( + farmId=farm_id, + queueId=queue_id, + attachments=upload_input_files_one_asset_in_cas.attachments.to_dict(), # type: ignore + targetTaskRunStatus="SUSPENDED", + template=job_template, + templateType="JSON", + priority=50, + ) + + job_id: str = job_response["jobId"] + + # Wait for the job to be created. + waiter = job_attachment_test.deadline_client.get_waiter("job_create_complete") + waiter.wait( + jobId=job_id, + queueId=job_attachment_test.queue_id, + farmId=job_attachment_test.farm_id, + ) + + # Return the created Job ID. + return job_id + + def _sync_mock_output_file( + self, + job_attachment_test: JobAttachmentTest, + job_id: str, + first_step_name: str, + second_step_name: str, + asset_root_path: str, + ) -> str: + """ + Create a fake manifest file, uplaod it as a step output and return the step ID that is dependent. + job_attachment_test: JobAttachmentTest test harness + job_id: str, self explainatory. + first_step_name: str, self explainatory. + second_step_name: str, self explainatory. + asset_root_path: Asset root to upload an output file. + """ + list_steps_response = job_attachment_test.deadline_client.list_steps( + farmId=job_attachment_test.farm_id, + queueId=job_attachment_test.queue_id, + jobId=job_id, + ) + + # Find the IDs of the steps: + step_ids = {step["name"]: step["stepId"] for step in list_steps_response["steps"]} + first_step_id = step_ids[first_step_name] + second_step_id = step_ids[second_step_name] + + # Get the task of the first step so we can upload a fake manifest. + first_step_first_task_id = job_attachment_test.deadline_client.list_tasks( + farmId=job_attachment_test.farm_id, + queueId=job_attachment_test.queue_id, + jobId=job_id, + stepId=first_step_id, + )["tasks"][0]["taskId"] + + assert first_step_first_task_id is not None + + # Create a fake manifest as output and upload it to S3. + asset_sync = AssetSync(job_attachment_test.farm_id) + + output_manifest = AssetManifest( + hash_alg=HashAlgorithm("xxh128"), + total_size=10, + paths=[ + ManifestPath(path="output_file", hash="a", size=1, mtime=167907934333848), + ManifestPath( + path="output/nested_output_file", hash="b", size=1, mtime=1479079344833848 + ), + ], + ) + + session_action_id_with_time_stamp = ( + f"{_float_to_iso_datetime_string(time.time())}_session-86231a00283449158900410c7d58051e" + ) + full_output_prefix = job_attachment_test.job_attachment_settings.full_output_prefix( + farm_id=job_attachment_test.farm_id, + queue_id=job_attachment_test.queue_id, + job_id=job_id, + step_id=first_step_id, + task_id=first_step_first_task_id, + session_action_id=session_action_id_with_time_stamp, + ) + asset_sync._upload_output_manifest_to_s3( + s3_settings=job_attachment_test.job_attachment_settings, + output_manifest=output_manifest, + full_output_prefix=full_output_prefix, + root_path=asset_root_path, + ) + return second_step_id + + @pytest.mark.parametrize( + "json_output", + [ + pytest.param(True), + pytest.param(False), + ], + ) + def test_manifest_download_job( + self, + temp_dir: str, + json_output: bool, + upload_input_files_one_asset_in_cas: UploadInputFilesOneAssetInCasOutputs, + default_job_template: str, + job_attachment_test: JobAttachmentTest, + ): + # Given: + # Create a job + job_id: str = self._setup_create_job( + upload_input_files_one_asset_in_cas, default_job_template, job_attachment_test + ) + + # When + runner = CliRunner() + # Download for farm, queue, job to temp dir. + args = [ + "manifest", + "download", + "--farm-id", + job_attachment_test.farm_id, + "--queue-id", + job_attachment_test.queue_id, + "--job-id", + job_id, + temp_dir, + ] + if json_output: + args.append("--json") + result = runner.invoke(main, args) + + # Then + assert ( + result.exit_code == 0 + ), f"{result.output}, {job_attachment_test.farm_id}, {job_attachment_test.queue_id}" + if json_output: + # If JSON mode was specified, make sure the output is JSON and contains the downloaded manifest file. + download = json.loads(result.output) + assert download is not None + assert len(download["downloaded"]) == 1 + + # With JSON mode, we can also check the manifest file itself. + with open(download["downloaded"][0]["local_manifest_path"]) as manifest_file: + manifest: BaseAssetManifest = decode_manifest(manifest_file.read()) + assert manifest is not None + + # Create a list of files we know should be in the input paths. + files: List[str] = [path.path for path in manifest.paths] + assert os.path.join("inputs", "textures", "brick.png") in files + assert os.path.join("inputs", "textures", "brick.png") in files + assert os.path.join("inputs", "scene.ma") in files + + @pytest.mark.parametrize( + "json_output", + [ + pytest.param(True), + pytest.param(False), + ], + ) + def test_manifest_download_job_step_dependency( + self, + temp_dir: str, + json_output: bool, + upload_input_files_one_asset_in_cas: UploadInputFilesOneAssetInCasOutputs, + default_job_template_step_step_dependency: str, + job_attachment_test: JobAttachmentTest, + ): + + # Create a job, with step step dependency. + job_id: str = self._setup_create_job( + upload_input_files_one_asset_in_cas, + default_job_template_step_step_dependency, + job_attachment_test, + ) + + # Upload a dependent task output manifest. + asset_root_path: str = upload_input_files_one_asset_in_cas.attachments.manifests[0].rootPath + second_step_id: str = self._sync_mock_output_file( + job_attachment_test, job_id, "custom-step", "custom-step-2", asset_root_path + ) + + # When + runner = CliRunner() + # Download for farm, queue, job to temp dir. + args = [ + "manifest", + "download", + "--farm-id", + job_attachment_test.farm_id, + "--queue-id", + job_attachment_test.queue_id, + "--job-id", + job_id, + "--step-id", + second_step_id, + temp_dir, + ] + if json_output: + args.append("--json") + result = runner.invoke(main, args) + + # Then + assert ( + result.exit_code == 0 + ), f"{result.output}, {job_attachment_test.farm_id}, {job_attachment_test.queue_id}" + if json_output: + # If JSON mode was specified, make sure the output is JSON and contains the downloaded manifest file. + download = json.loads(result.output) + assert download is not None + assert len(download["downloaded"]) == 1 + + # With JSON mode, we can also check the manifest file itself. + with open(download["downloaded"][0]["local_manifest_path"]) as manifest_file: + manifest: BaseAssetManifest = decode_manifest(manifest_file.read()) + assert manifest is not None + + # Create a list of files we know should be in the input paths. + files: List[str] = [path.path for path in manifest.paths] + assert os.path.join("inputs", "textures", "brick.png") in files + assert os.path.join("inputs", "textures", "brick.png") in files + assert os.path.join("inputs", "scene.ma") in files + assert os.path.join("output_file") in files + assert os.path.join("output", "nested_output_file") in files diff --git a/test/integ/cli/test_cli_manifest_upload.py b/test/integ/cli/test_cli_manifest_upload.py new file mode 100644 index 00000000..6a2fb67c --- /dev/null +++ b/test/integ/cli/test_cli_manifest_upload.py @@ -0,0 +1,140 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +""" +Integ tests for the CLI manifest upload commands. +""" +import os +from pathlib import Path +from typing import Optional +import boto3 +from click.testing import CliRunner +from deadline.client.cli._groups.manifest_group import cli_manifest +from deadline.job_attachments.api.manifest import _manifest_snapshot +from deadline.job_attachments.models import ManifestSnapshot +import pytest +import tempfile +from botocore.exceptions import ClientError + +from deadline.client.cli import main + + +TEST_FILE_CONTENT = "test file content" +TEST_SUB_DIR_FILE_CONTENT = "subdir file content" +TEST_ROOT_DIR_FILE_CONTENT = "root file content" + +TEST_ROOT_FILE = "root_file.txt" +TEST_SUB_FILE = "subdir_file.txt" + +TEST_ROOT_DIR = "root_dir" +TEST_MANIFEST_DIR = "manifest_dir" +TEST_SUB_DIR_1 = "subdir1" +TEST_SUB_DIR_2 = "subdir2" + + +@pytest.mark.integ +class TestManifestUpload: + + @pytest.fixture + def temp_dir(self): + with tempfile.TemporaryDirectory() as tmpdir_path: + yield tmpdir_path + + def create_manifest_file(self, temp_dir) -> str: + """ + Create a test manifest file, and return the full path for testing. + """ + + # Given a snapshot file: + test_file_name = "test_file" + test_file = os.path.join(temp_dir, test_file_name) + os.makedirs(os.path.dirname(test_file), exist_ok=True) + with open(test_file, "w") as f: + f.write("testing123") + + # When + manifest: Optional[ManifestSnapshot] = _manifest_snapshot( + root=temp_dir, destination=temp_dir, name="test" + ) + + # Then + assert manifest is not None + assert manifest.manifest is not None + return manifest.manifest + + def test_manifest_upload(self, temp_dir): + """ + Simple test to generate a manifest, and then call the upload CLI to upload to S3. + The test verifies the manifest is uploaded by doing a S3 get call. + """ + + # Given a snapshot file: + manifest_file = self.create_manifest_file(temp_dir) + manifest_file_name = Path(manifest_file).name + + # Now that we have a manifest file, execute the CLI and upload it to S3 + # The manifest file name is unique, so it will not collide with prior test runs. + s3_bucket = os.environ.get("JOB_ATTACHMENTS_BUCKET") + runner = CliRunner() + # Temporary, always add cli_manifest until launched. + main.add_command(cli_manifest) + result = runner.invoke( + main, + [ + "manifest", + "upload", + "--s3-cas-uri", + f"s3://{s3_bucket}/DeadlineCloud", + manifest_file, + ], + ) + assert result.exit_code == 0, f"Non-Zeo exit code, CLI output {result.output}" + + # Then validate the Manifest file is uploaded to S3 by checking the file actually exists. + manifest_s3_path = f"DeadlineCloud/Manifests/{manifest_file_name}" + s3_client = boto3.client("s3") + s3_client.head_object(Bucket=s3_bucket, Key=manifest_s3_path) + + # Cleanup. + s3_client.delete_object(Bucket=s3_bucket, Key=manifest_s3_path) + + def test_manifest_upload_by_farm_queue(self, temp_dir): + """ + Simple test to generate a manifest, and then call the upload CLI to upoad to S3. + This test case uses --farm-id and --queue-id + The test verifies the manifest is uploaded by doing a S3 get call. + """ + + # Given a snapshot file: + manifest_file = self.create_manifest_file(temp_dir) + manifest_file_name = Path(manifest_file).name + + # Now that we have a manifest file, execute the CLI and upload it to S3 + # The manifest file name is unique, so it will not collide with prior test runs. + s3_bucket = os.environ.get("JOB_ATTACHMENTS_BUCKET", "") + runner = CliRunner() + # Temporary, always add cli_manifest until launched. + main.add_command(cli_manifest) + result = runner.invoke( + main, + [ + "manifest", + "upload", + "--farm-id", + os.environ.get("FARM_ID", ""), + "--queue-id", + os.environ.get("QUEUE_ID", ""), + manifest_file, + ], + ) + assert result.exit_code == 0, f"Non-Zeo exit code, CLI output {result.output}" + + # Then validate the Manifest file is uploaded to S3 by checking the file actually exists. + manifest_s3_path = f"DeadlineCloud/Manifests/{manifest_file_name}" + s3_client = boto3.client("s3") + try: + s3_client.head_object(Bucket=s3_bucket, Key=manifest_s3_path) + except ClientError: + pytest.fail(f"File not found at {s3_bucket}, {manifest_s3_path}") + + # Cleanup. + s3_client.delete_object(Bucket=s3_bucket, Key=manifest_s3_path) diff --git a/test/integ/cli/test_data/inputs/scene.ma b/test/integ/cli/test_data/inputs/scene.ma new file mode 100644 index 00000000..b2bad0ef --- /dev/null +++ b/test/integ/cli/test_data/inputs/scene.ma @@ -0,0 +1 @@ +this is a scene file \ No newline at end of file diff --git a/test/integ/cli/test_data/inputs/textures/brick.png b/test/integ/cli/test_data/inputs/textures/brick.png new file mode 100644 index 00000000..90584e64 --- /dev/null +++ b/test/integ/cli/test_data/inputs/textures/brick.png @@ -0,0 +1 @@ +this is a brick png \ No newline at end of file diff --git a/test/integ/cli/test_data/inputs/textures/cloth.png b/test/integ/cli/test_data/inputs/textures/cloth.png new file mode 100644 index 00000000..5b9f8b5c --- /dev/null +++ b/test/integ/cli/test_data/inputs/textures/cloth.png @@ -0,0 +1 @@ +this is a png of a cloth \ No newline at end of file diff --git a/test/integ/cli/test_data/outputs/not_for_sync_outputs.txt b/test/integ/cli/test_data/outputs/not_for_sync_outputs.txt new file mode 100644 index 00000000..d6cfdd1f --- /dev/null +++ b/test/integ/cli/test_data/outputs/not_for_sync_outputs.txt @@ -0,0 +1 @@ +Although it is in the output directory, it is actually an input file. It should be downloaded (to the worker's session working directory) during sync_inputs, and should not be captured as an output file when sync_outputs. \ No newline at end of file diff --git a/test/integ/cli/test_utils.py b/test/integ/cli/test_utils.py new file mode 100644 index 00000000..4af5f89d --- /dev/null +++ b/test/integ/cli/test_utils.py @@ -0,0 +1,64 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +from dataclasses import dataclass +from pathlib import Path +import boto3 +from pytest import TempPathFactory +import os + +from deadline.job_attachments._aws.deadline import get_queue +from deadline.job_attachments.asset_manifests.hash_algorithms import HashAlgorithm, hash_file +from deadline.job_attachments.asset_manifests.versions import ManifestVersion +from deadline.job_attachments.models import Attachments, JobAttachmentS3Settings +from deadline_test_fixtures.deadline import DeadlineClient + + +class JobAttachmentTest: + """ + Hold information used across all job attachment integration tests. + """ + + ASSET_ROOT = Path(__file__).parent / "test_data" + OUTPUT_PATH = ASSET_ROOT / "outputs" + INPUT_PATH = ASSET_ROOT / "inputs" + SCENE_MA_PATH = INPUT_PATH / "scene.ma" + SCENE_MA_HASH = hash_file(str(SCENE_MA_PATH), HashAlgorithm.XXH128) + BRICK_PNG_PATH = INPUT_PATH / "textures" / "brick.png" + CLOTH_PNG_PATH = INPUT_PATH / "textures" / "cloth.png" + INPUT_IN_OUTPUT_DIR_PATH = OUTPUT_PATH / "not_for_sync_outputs.txt" + + def __init__( + self, + tmp_path_factory: TempPathFactory, + manifest_version: ManifestVersion, + ): + """ + Sets up resource that these integration tests will need. + """ + + self.farm_id: str = os.environ.get("FARM_ID", "") + self.queue_id: str = os.environ.get("QUEUE_ID", "") + + self.bucket = boto3.resource("s3").Bucket(os.environ.get("JOB_ATTACHMENTS_BUCKET", "")) + self.deadline_client = DeadlineClient(boto3.client("deadline")) + + self.hash_cache_dir = tmp_path_factory.mktemp("hash_cache") + self.s3_cache_dir = tmp_path_factory.mktemp("s3_check_cache") + self.session = boto3.Session() + self.deadline_endpoint = os.getenv( + "AWS_ENDPOINT_URL_DEADLINE", + f"https://deadline.{self.session.region_name}.amazonaws.com", + ) + + self.job_attachment_settings: JobAttachmentS3Settings = get_queue( + farm_id=self.farm_id, + queue_id=self.queue_id, + deadline_endpoint_url=self.deadline_endpoint, + ).jobAttachmentSettings # type: ignore[union-attr,assignment] + + self.manifest_version = manifest_version + + +@dataclass +class UploadInputFilesOneAssetInCasOutputs: + attachments: Attachments diff --git a/test/integ/conftest.py b/test/integ/conftest.py index c651a896..b33e9bc2 100644 --- a/test/integ/conftest.py +++ b/test/integ/conftest.py @@ -1,5 +1,6 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +import json import os import pytest @@ -10,3 +11,144 @@ def external_bucket() -> str: Return a bucket that all developers and test accounts have access to, but isn't in the testers account. """ return os.environ.get("INTEG_TEST_JA_CROSS_ACCOUNT_BUCKET", "job-attachment-bucket-snipe-test") + + +@pytest.fixture(scope="session") +def default_job_template() -> str: + """ + A generic job template with 2 steps. First step has 2 tasks and the second step has 1 task. + """ + return json.dumps( + { + "name": "custom-job", + "specificationVersion": "jobtemplate-2023-09", + "steps": [ + { + "name": "custom-step", + "parameterSpace": { + "taskParameterDefinitions": [ + {"name": "frame", "type": "INT", "range": ["0", "1"]} + ] + }, + "script": { + "actions": {"onRun": {"command": "{{ Task.File.run }}"}}, + "embeddedFiles": [ + { + "name": "run", + "data": "#!/bin/env bash\n" "set -ex\n" "echo 'First Step'", + "runnable": True, + "type": "TEXT", + } + ], + }, + }, + { + "name": "custom-step-2", + "parameterSpace": { + "taskParameterDefinitions": [ + {"name": "frame", "type": "INT", "range": ["0"]} + ] + }, + "script": { + "actions": {"onRun": {"command": "{{ Task.File.run }}"}}, + "embeddedFiles": [ + { + "name": "run", + "data": "#!/bin/env bash\n" "set -ex\n" "echo 'Second step'", + "runnable": True, + "type": "TEXT", + } + ], + }, + }, + ], + } + ) + + +@pytest.fixture(scope="session") +def default_job_template_step_step_dependency() -> str: + """ + A generic job template with 2 steps. Second step depends on first step. Both steps have 1 task. + """ + return json.dumps( + { + "name": "custom-step-step-job", + "specificationVersion": "jobtemplate-2023-09", + "steps": [ + { + "name": "custom-step", + "parameterSpace": { + "taskParameterDefinitions": [ + {"name": "frame", "type": "INT", "range": ["0"]} + ] + }, + "script": { + "actions": {"onRun": {"command": "{{ Task.File.run }}"}}, + "embeddedFiles": [ + { + "name": "run", + "data": "#!/bin/env bash\n" "set -ex\n" "echo 'First Step'", + "runnable": True, + "type": "TEXT", + } + ], + }, + }, + { + "name": "custom-step-2", + "dependencies": [{"dependsOn": "custom-step"}], + "parameterSpace": { + "taskParameterDefinitions": [ + {"name": "frame", "type": "INT", "range": ["0"]} + ] + }, + "script": { + "actions": {"onRun": {"command": "{{ Task.File.run }}"}}, + "embeddedFiles": [ + { + "name": "run", + "data": "#!/bin/env bash\n" "set -ex\n" "echo 'Second step'", + "runnable": True, + "type": "TEXT", + } + ], + }, + }, + ], + } + ) + + +@pytest.fixture() +def default_job_template_one_task_one_step() -> str: + """ + A generic job template with one step and one task. + """ + return json.dumps( + { + "name": "custom-job", + "specificationVersion": "jobtemplate-2023-09", + "steps": [ + { + "name": "custom-step", + "parameterSpace": { + "taskParameterDefinitions": [ + {"name": "frame", "type": "INT", "range": ["0"]} + ] + }, + "script": { + "actions": {"onRun": {"command": "{{ Task.File.run }}"}}, + "embeddedFiles": [ + { + "name": "run", + "data": "#!/bin/env bash\n" "set -ex\n" "echo 'First Step'", + "runnable": True, + "type": "TEXT", + } + ], + }, + }, + ], + } + ) diff --git a/test/integ/deadline_job_attachments/conftest.py b/test/integ/deadline_job_attachments/conftest.py index 6a38a759..768c78d7 100644 --- a/test/integ/deadline_job_attachments/conftest.py +++ b/test/integ/deadline_job_attachments/conftest.py @@ -1,96 +1,7 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. import getpass -import json import sys -import pytest - - -@pytest.fixture(scope="session") -def default_job_template() -> str: - """ - A generic job template with 2 steps. First step has 2 tasks and the second step has 1 task. - """ - return json.dumps( - { - "name": "custom-job", - "specificationVersion": "jobtemplate-2023-09", - "steps": [ - { - "name": "custom-step", - "parameterSpace": { - "taskParameterDefinitions": [ - {"name": "frame", "type": "INT", "range": ["0", "1"]} - ] - }, - "script": { - "actions": {"onRun": {"command": "{{ Task.File.run }}"}}, - "embeddedFiles": [ - { - "name": "run", - "data": "#!/bin/env bash\n" "set -ex\n" "echo 'First Step'", - "runnable": True, - "type": "TEXT", - } - ], - }, - }, - { - "name": "custom-step-2", - "parameterSpace": { - "taskParameterDefinitions": [ - {"name": "frame", "type": "INT", "range": ["0"]} - ] - }, - "script": { - "actions": {"onRun": {"command": "{{ Task.File.run }}"}}, - "embeddedFiles": [ - { - "name": "run", - "data": "#!/bin/env bash\n" "set -ex\n" "echo 'Second step'", - "runnable": True, - "type": "TEXT", - } - ], - }, - }, - ], - } - ) - - -@pytest.fixture() -def default_job_template_one_task_one_step() -> str: - """ - A generic job template with one step and one task. - """ - return json.dumps( - { - "name": "custom-job", - "specificationVersion": "jobtemplate-2023-09", - "steps": [ - { - "name": "custom-step", - "parameterSpace": { - "taskParameterDefinitions": [ - {"name": "frame", "type": "INT", "range": ["0"]} - ] - }, - "script": { - "actions": {"onRun": {"command": "{{ Task.File.run }}"}}, - "embeddedFiles": [ - { - "name": "run", - "data": "#!/bin/env bash\n" "set -ex\n" "echo 'First Step'", - "runnable": True, - "type": "TEXT", - } - ], - }, - }, - ], - } - ) def is_windows_non_admin(): diff --git a/test/unit/deadline_job_attachments/api/test_manifest_download.py b/test/unit/deadline_job_attachments/api/test_manifest_download.py new file mode 100644 index 00000000..cbfd88f5 --- /dev/null +++ b/test/unit/deadline_job_attachments/api/test_manifest_download.py @@ -0,0 +1,80 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +import tempfile +from typing import List +from unittest.mock import MagicMock, patch +import pytest + +from deadline.job_attachments.api.manifest import _manifest_download +from deadline.job_attachments.models import ManifestDownloadResponse + + +class TestManifestDownload: + + @pytest.fixture + def temp_dir(self): + with tempfile.TemporaryDirectory() as tmpdir_path: + yield tmpdir_path + + @patch("deadline.job_attachments.api.manifest._get_queue_user_boto3_session") + @patch("deadline.job_attachments.api.manifest.get_manifest_from_s3") + @patch("deadline.job_attachments.api.manifest.get_output_manifests_by_asset_root") + @pytest.mark.parametrize( + "job_manifests,step_manifests", + [ + pytest.param([], []), + pytest.param([{"inputManifestPath": "s3://hello/world", "rootPath": "/some/root"}], []), + pytest.param([], [{"stepId": "step-123456"}]), + pytest.param( + [{"inputManifestPath": "s3://hello/world", "rootPath": "/some/root"}], + [{"stepId": "step-123456"}], + ), + ], + ) + def test_download_job( + self, + mock_get_output_manifest: MagicMock, + mock_get_manifest_from_s3: MagicMock, + mock_queue_session: MagicMock, + job_manifests: List, + step_manifests: List, + temp_dir: str, + ) -> None: + + # This is heavily mocked, so return nothing. Integration tests tests full manifest merging. + mock_get_manifest_from_s3.return_value = None + mock_get_output_manifest.return_value = {} + + # Mock Boto + mock_boto_session = MagicMock() + + # Mock Get Queue Credentials + mock_queue_session.return_value = MagicMock() + + # Mock up Deadline. + mock_deadline_client = MagicMock() + mock_boto_session.client.return_value = mock_deadline_client + + # Mock the result of get_queue + mock_deadline_client.get_queue.return_value = { + "displayName": "queue", + "jobAttachmentSettings": {"s3BucketName": "bucket", "rootPrefix": "root_prefix"}, + } + # Mock the result of get_job + mock_deadline_client.get_job.return_value = { + "name": "Mock Job", + "attachments": { + "manifests": job_manifests, + }, + } + # Mock the result of list_step_dependencies + mock_deadline_client.list_step_dependencies.return_value = {"dependencies": step_manifests} + + output: ManifestDownloadResponse = _manifest_download( + download_dir=temp_dir, + farm_id="farm-12345", + queue_id="queue-12345", + job_id="job-12345", + boto3_session=mock_boto_session, + ) + assert output is not None diff --git a/test/unit/deadline_job_attachments/api/test_manifest_upload.py b/test/unit/deadline_job_attachments/api/test_manifest_upload.py new file mode 100644 index 00000000..717e85e5 --- /dev/null +++ b/test/unit/deadline_job_attachments/api/test_manifest_upload.py @@ -0,0 +1,89 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + +import os +import tempfile +from unittest.mock import ANY, MagicMock, patch +import pytest + +from deadline.client import api + +from deadline.job_attachments.api.manifest import _manifest_upload + + +TEST_MANIFEST = '{"foo":"bar"}' +TEST_BUCKET_NAME = "s3://foobarbucket" +TEST_CAS_PREFIX = "in/a/galaxy" +TEST_KEY_PREFIX = "far/far/away" + + +class TestManifestUpload: + + @pytest.fixture + def temp_dir(self): + with tempfile.TemporaryDirectory() as tmpdir_path: + yield tmpdir_path + + @pytest.fixture + def mock_manifest_file(self, temp_dir) -> str: + """ + Create a Mock manifest file saved to the temp dir. + :return path to the test file. + """ + path = os.path.join(temp_dir, "test.manifest") + with open(path, "w") as manifest_file: + manifest_file.write(TEST_MANIFEST) + return path + + @patch("deadline.job_attachments.api.manifest.S3AssetUploader") + def test_upload(self, mock_upload_assets: MagicMock, mock_manifest_file: str) -> None: + """ + Upload is really simple. It is a pass through to S3AssetUploader. Make sure it is called correctly. + """ + # Given + boto_session = api.get_boto3_session() + + # When the API is called.... + _manifest_upload( + manifest_file=mock_manifest_file, + s3_bucket_name=TEST_BUCKET_NAME, + s3_cas_prefix=TEST_CAS_PREFIX, + boto_session=boto_session, + ) + + # Then + mock_upload_assets.return_value.upload_bytes_to_s3.assert_called_once_with( + bytes=ANY, + bucket=TEST_BUCKET_NAME, + key=TEST_CAS_PREFIX + "/Manifests/test.manifest", + progress_handler=ANY, + extra_args=ANY, + ) + + @patch("deadline.job_attachments.api.manifest.S3AssetUploader") + def test_upload_with_prefix( + self, mock_upload_assets: MagicMock, mock_manifest_file: str + ) -> None: + """ + Upload is really simple. It is a pass through to S3AssetUploader. Make sure it is called correctly with prefix + """ + # Given + boto_session = api.get_boto3_session() + + # When the API is called.... + _manifest_upload( + manifest_file=mock_manifest_file, + s3_bucket_name=TEST_BUCKET_NAME, + s3_cas_prefix=TEST_CAS_PREFIX, + s3_key_prefix=TEST_KEY_PREFIX, + boto_session=boto_session, + ) + + # Then + mock_upload_assets.return_value.upload_bytes_to_s3.assert_called_once_with( + bytes=ANY, + bucket=TEST_BUCKET_NAME, + key=TEST_CAS_PREFIX + "/Manifests/" + TEST_KEY_PREFIX + "/test.manifest", + progress_handler=ANY, + extra_args=ANY, + )