Skip to content

Commit

Permalink
feat(JA): Add manifest upload and download to complete the JA standal…
Browse files Browse the repository at this point in the history
…one API+CLI featureset.

Signed-off-by: David Leong <[email protected]>
  • Loading branch information
leongdl committed Oct 31, 2024
1 parent bffc055 commit b5b203b
Show file tree
Hide file tree
Showing 17 changed files with 1,276 additions and 260 deletions.
115 changes: 105 additions & 10 deletions src/deadline/client/cli/_groups/manifest_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,31 @@
"""
from __future__ import annotations

from configparser import ConfigParser
import dataclasses
import os
from typing import List
from typing import List, Optional
import boto3
import click

from deadline.client import api
from deadline.client.config import config_file
from deadline.job_attachments._diff import pretty_print_cli
from deadline.job_attachments.api.manifest import (
_glob_files,
_manifest_diff,
_manifest_download,
_manifest_snapshot,
_manifest_upload,
)
from deadline.job_attachments.models import (
S3_MANIFEST_FOLDER_NAME,
JobAttachmentS3Settings,
ManifestDiff,
)
from deadline.job_attachments.models import ManifestDiff

from ...exceptions import NonValidInputError
from .._common import _handle_error
from .._common import _apply_cli_options_to_config, _handle_error
from .click_logger import ClickLogger


Expand Down Expand Up @@ -228,28 +238,113 @@ def manifest_download(
"""
Downloads input manifest of previously submitted job.
"""
raise NotImplementedError("This CLI is being implemented.")
logger: ClickLogger = ClickLogger(is_json=json)
if not os.path.isdir(download_dir):
raise NonValidInputError(f"Specified destination directory {download_dir} does not exist. ")

# setup config
config: Optional[ConfigParser] = _apply_cli_options_to_config(
required_options={"farm_id", "queue_id"}, **args
)
queue_id: str = config_file.get_setting("defaults.queue_id", config=config)
farm_id: str = config_file.get_setting("defaults.farm_id", config=config)

boto3_session: boto3.Session = api.get_boto3_session(config=config)

output = _manifest_download(
download_dir=download_dir,
farm_id=farm_id,
queue_id=queue_id,
job_id=job_id,
step_id=step_id,
boto3_session=boto3_session,
logger=logger,
)
logger.json(dataclasses.asdict(output))


@cli_manifest.command(
name="upload",
help="BETA - Uploads a job attachment manifest file to a Content Addressable Storage's Manifest store. If calling via --s3-cas-path, it is recommended to use with --profile for a specific AWS profile with CAS S3 bucket access.",
help="BETA - Uploads a job attachment manifest file to a Content Addressable Storage's Manifest store. If calling via --s3-cas-path, it is recommended to use with --profile for a specific AWS profile with CAS S3 bucket access. Check exit code for success or failure.",
)
@click.argument("manifest_file")
@click.option("--profile", help="The AWS profile to use.")
@click.option("--s3-cas-path", help="The path to the Content Addressable Storage root.")
@click.option("--s3-cas-uri", help="The URI to the Content Addressable Storage S3 bucket and root.")
@click.option(
"--s3-manifest-prefix", help="Prefix subpath in the manifest folder to upload the manifest."
)
@click.option(
"--farm-id", help="The AWS Deadline Cloud Farm to use. Alternative to using --s3-cas-path."
"--farm-id", help="The AWS Deadline Cloud Farm to use. Alternative to using --s3-cas-uri."
)
@click.option(
"--queue-id", help="The AWS Deadline Cloud Queue to use. Alternative to using --s3-cas-path."
"--queue-id", help="The AWS Deadline Cloud Queue to use. Alternative to using --s3-cas-uri."
)
@click.option("--json", default=None, is_flag=True, help="Output is printed as JSON for scripting.")
@_handle_error
def manifest_upload(
manifest_file: str,
s3_cas_path: str,
s3_cas_uri: str,
s3_manifest_prefix: str,
json: bool,
**args,
):
raise NotImplementedError("This CLI is being implemented.")
# Input checking.
if not manifest_file or not os.path.isfile(manifest_file):
raise NonValidInputError(f"Specified manifest {manifest_file} does not exist. ")

# Where will we upload the manifest to?
required: set[str] = set()
if not s3_cas_uri:
required = {"farm_id", "queue_id"}

config: Optional[ConfigParser] = _apply_cli_options_to_config(required_options=required, **args)

# Logger
logger: ClickLogger = ClickLogger(is_json=json)

bucket_name: str = ""
session: boto3.Session = api.get_boto3_session(config=config)
if not s3_cas_uri:
farm_id = config_file.get_setting("defaults.farm_id", config=config)
queue_id = config_file.get_setting("defaults.queue_id", config=config)

deadline = api.get_boto3_client("deadline", config=config)
queue = deadline.get_queue(
farmId=farm_id,
queueId=queue_id,
)
queue_ja_settings: JobAttachmentS3Settings = JobAttachmentS3Settings(
**queue["jobAttachmentSettings"]
)
bucket_name = queue_ja_settings.s3BucketName
cas_path = queue_ja_settings.rootPrefix

# IF we supplied a farm and queue, use the queue credentials.
session = api.get_queue_user_boto3_session(
deadline=deadline,
config=config,
farm_id=farm_id,
queue_id=queue_id,
queue_display_name=queue["displayName"],
)

else:
# Self supplied cas path.
uri_ja_settings: JobAttachmentS3Settings = JobAttachmentS3Settings.from_s3_root_uri(
s3_cas_uri
)
bucket_name = uri_ja_settings.s3BucketName
cas_path = uri_ja_settings.rootPrefix

logger.echo(
f"Uploading Manifest to {bucket_name} {cas_path} {S3_MANIFEST_FOLDER_NAME}, prefix: {s3_manifest_prefix}"
)
_manifest_upload(
manifest_file=manifest_file,
s3_bucket_name=bucket_name,
s3_cas_prefix=cas_path,
s3_key_prefix=s3_manifest_prefix,
boto_session=session,
logger=logger,
)
logger.echo("Uploading successful!")
205 changes: 203 additions & 2 deletions src/deadline/job_attachments/api/manifest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

import datetime
from io import BytesIO
import os
from pathlib import Path
from typing import List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

import boto3

from deadline.client.api._session import _get_queue_user_boto3_session, get_default_client_config
from deadline.client.cli._groups.click_logger import ClickLogger
from deadline.job_attachments._diff import _fast_file_list_to_manifest_diff, compare_manifest
from deadline.job_attachments._glob import _process_glob_inputs, _glob_paths
Expand All @@ -19,14 +23,23 @@
from deadline.job_attachments.asset_manifests.decode import decode_manifest
from deadline.job_attachments.asset_manifests.hash_algorithms import hash_data
from deadline.job_attachments.caches.hash_cache import HashCache
from deadline.job_attachments.download import (
get_manifest_from_s3,
get_output_manifests_by_asset_root,
merge_asset_manifests,
)
from deadline.job_attachments.models import (
S3_MANIFEST_FOLDER_NAME,
FileStatus,
GlobConfig,
JobAttachmentS3Settings,
ManifestDiff,
ManifestDownload,
ManifestDownloadResponse,
ManifestSnapshot,
default_glob_all,
)
from deadline.job_attachments.upload import S3AssetManager
from deadline.job_attachments.upload import S3AssetManager, S3AssetUploader

"""
APIs here should be business logic only. It should perform one thing, and one thing well.
Expand Down Expand Up @@ -247,3 +260,191 @@ def process_output(status: FileStatus, path: str, output_diff: ManifestDiff):
process_output(fast_diff_item[1], fast_diff_item[0], output)

return output


def _manifest_upload(
manifest_file: str,
s3_bucket_name: str,
s3_cas_prefix: str,
boto_session: boto3.Session,
s3_key_prefix: Optional[str] = None,
logger: ClickLogger = ClickLogger(False),
):
"""
BETA API - This API is still evolving but will be made public in the near future.
API to upload a job attachment manifest to the Content Addressable Storage. Manifests will be
uploaded to s3://{s3_bucket_name}/{cas_prefix}/Manifests/{s3_key_prefix}/{manifest_file_name} as per the Deadline CAS folder structure.
manifest_file: File Path to the manifest file for upload.
s3_bucket_name: S3 bucket name.
boto_session: S3 Content Addressable Storage prefix.
s3_key_prefix: [Optional] S3 prefix path to the Content Addressable Storge.
boto_session: Boto3 session.
logger: Click Logger instance to print to CLI as test or JSON.
"""
# S3 metadata

# Upload settings:
s3_metadata: Dict[str, Any] = {"Metadata": {}}
s3_metadata["Metadata"]["file-system-location-name"] = manifest_file

# Always upload the manifest file to case root /Manifest with the original file name.
manifest_path: str = "/".join(
[s3_cas_prefix, S3_MANIFEST_FOLDER_NAME, s3_key_prefix, Path(manifest_file).name]
if s3_key_prefix
else [s3_cas_prefix, S3_MANIFEST_FOLDER_NAME, Path(manifest_file).name]
)

# S3 uploader.
upload = S3AssetUploader(session=boto_session)
with open(manifest_file) as manifest:
upload.upload_bytes_to_s3(
bytes=BytesIO(manifest.read().encode("utf-8")),
bucket=s3_bucket_name,
key=manifest_path,
progress_handler=logger.echo,
extra_args=s3_metadata,
)


def _manifest_download(
download_dir: str,
farm_id: str,
queue_id: str,
job_id: str,
boto3_session: boto3.Session,
step_id: Optional[str] = None,
logger: ClickLogger = ClickLogger(False),
) -> ManifestDownloadResponse:
"""
BETA API - This API is still evolving but will be made public in the near future.
API to download the Job Attachment manifest for a Job, and optionally dependencies for Step.
download_dir: Download directory.
farm_id: The Deadline Farm to download from.
queue_id: The Deadline Queue to download from.
job_id: Job Id to download.
boto_session: Boto3 session.
step_id: Optional[str]: Optional, download manifest for a step
logger: Click Logger instance to print to CLI as test or JSON.
return ManifestDownloadResponse Downloaded Manifest data. Contains source S3 key and local download path.
"""

# Deadline Client and get the Queue to download.
deadline = boto3_session.client("deadline", config=get_default_client_config())

queue: dict = deadline.get_queue(
farmId=farm_id,
queueId=queue_id,
)

# assume queue role - session permissions
queue_role_session: boto3.Session = _get_queue_user_boto3_session(
deadline=deadline,
base_session=boto3_session,
farm_id=farm_id,
queue_id=queue_id,
queue_display_name=queue["displayName"],
)

# Queue's Job Attachment settings.
queue_s3_settings = JobAttachmentS3Settings(**queue["jobAttachmentSettings"])

# Get S3 prefix
s3_prefix: Path = Path(queue_s3_settings.rootPrefix, S3_MANIFEST_FOLDER_NAME)

# Capture a list of success download files for JSON output.
successful_downloads: List[ManifestDownload] = []

# Utility function to build up manifests by root.
manifests_by_root: Dict[str, List[BaseAssetManifest]] = dict()

def add_manifest_by_root(
manifests_by_root: Dict[str, list], root: str, manifest: BaseAssetManifest
):
if root not in manifests_by_root:
manifests_by_root[root] = []
manifests_by_root[root].append(manifest)

# Get input_manifest_paths from Deadline GetJob API
job: dict = deadline.get_job(farmId=farm_id, queueId=queue_id, jobId=job_id)
attachments: dict = job["attachments"] if "attachments" in job else {}
input_manifest_paths: List[Tuple[str, str]] = [
(manifest.get("inputManifestPath", ""), manifest["rootPath"])
for manifest in attachments["manifests"]
]

# Download each input_manifest_path
for input_manifest_path, root_path in input_manifest_paths:
asset_manifest: BaseAssetManifest = get_manifest_from_s3(
manifest_key=(s3_prefix / input_manifest_path).as_posix(),
s3_bucket=queue_s3_settings.s3BucketName,
session=queue_role_session,
)
if asset_manifest is not None:
logger.echo(f"Downloaded input manifest for root: {root_path}")
add_manifest_by_root(
manifests_by_root=manifests_by_root, root=root_path, manifest=asset_manifest
)

# Now handle step-step dependencies
if step_id is not None:
# Get Step-Step dependencies.
nextToken = ""
step_dep_response = deadline.list_step_dependencies(
farmId=farm_id,
queueId=queue_id,
jobId=job_id,
stepId=step_id,
nextToken=nextToken,
)
for dependent_step in step_dep_response["dependencies"]:
logger.echo(f"Found Step-Step dependency. {dependent_step['stepId']}")

# Get manifests for the step-step dependency
step_manifests_by_root: Dict[str, List[BaseAssetManifest]] = (
get_output_manifests_by_asset_root(
s3_settings=queue_s3_settings,
farm_id=farm_id,
queue_id=queue_id,
job_id=job_id,
step_id=dependent_step["stepId"],
session=queue_role_session,
)
)
# Merge all manifests by root.
for root in step_manifests_by_root.keys():
for manifest in step_manifests_by_root[root]:
logger.echo(f"Found step-step output manifest for root: {root}")
add_manifest_by_root(
manifests_by_root=manifests_by_root, root=root, manifest=manifest
)

# Finally, merge all manifest paths to create unified manifests.
# TODO: Filter outputs by path
merged_manifests: Dict[str, BaseAssetManifest] = {}
for root in manifests_by_root.keys():
merged_manifest = merge_asset_manifests(manifests_by_root[root])
if merged_manifest:
merged_manifests[root] = merged_manifest

# Save the manifest files to disk.
for root in merged_manifests.keys():
# Save the merged manifest as {root}_{hash}_timestamp.
root_hash: str = hash_data(
root.encode("utf-8"), merged_manifests[root].get_default_hash_alg()
)
timestamp = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
manifest_name = root.replace("/", "_")
manifest_name = manifest_name[1:] if manifest_name[0] == "_" else manifest_name
manifest_name = f"{manifest_name}-{root_hash}-{timestamp}.manifest"

local_manifest_file_path = os.path.join(download_dir, manifest_name)
with open(local_manifest_file_path, "w") as file:
file.write(merged_manifests[root].encode())
successful_downloads.append(
ManifestDownload(manifest_root=root, local_manifest_path=str(local_manifest_file_path))
)
logger.echo(f"Downloaded merged manifest for root: {root} to: {local_manifest_file_path}")

# JSON output at the end.
output = ManifestDownloadResponse(downloaded=successful_downloads)
return output
Loading

0 comments on commit b5b203b

Please sign in to comment.