Skip to content

Commit

Permalink
feat(asset-cli): asset upload subcommand (aws-deadline#392)
Browse files Browse the repository at this point in the history
Signed-off-by: Tang <[email protected]>
  • Loading branch information
stangch committed Jul 17, 2024
1 parent a954d40 commit 5854423
Show file tree
Hide file tree
Showing 8 changed files with 691 additions and 38 deletions.
2 changes: 2 additions & 0 deletions src/deadline/client/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"logout",
"create_job_from_job_bundle",
"hash_attachments",
"upload_attachments",
"wait_for_create_job_to_complete",
"get_boto3_session",
"get_boto3_client",
Expand Down Expand Up @@ -57,6 +58,7 @@
create_job_from_job_bundle,
wait_for_create_job_to_complete,
hash_attachments,
upload_attachments,
)
from ._telemetry import (
get_telemetry_client,
Expand Down
4 changes: 2 additions & 2 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def create_job_from_job_bundle(
hashing_progress_callback=hashing_progress_callback,
)

attachment_settings = _upload_attachments(
attachment_settings = upload_attachments(
asset_manager, asset_manifests, print_function_callback, upload_progress_callback
)
attachment_settings["fileSystem"] = JobAttachmentsFileSystem(
Expand Down Expand Up @@ -432,7 +432,7 @@ def _default_update_hash_progress(hashing_metadata: Dict[str, str]) -> bool:
return hashing_summary, manifests


def _upload_attachments(
def upload_attachments(
asset_manager: S3AssetManager,
manifests: List[AssetRootManifest],
print_function_callback: Callable = lambda msg: None,
Expand Down
268 changes: 261 additions & 7 deletions src/deadline/client/cli/_groups/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,27 @@
"""
import os
from pathlib import Path
import concurrent.futures
from typing import List
import glob

import click

from deadline.client import api
from deadline.job_attachments.upload import S3AssetManager, S3AssetUploader
from deadline.job_attachments.models import JobAttachmentS3Settings
from deadline.job_attachments.upload import FileStatus, S3AssetManager, S3AssetUploader
from deadline.job_attachments.models import (
JobAttachmentS3Settings,
AssetRootManifest,
)
from deadline.job_attachments.asset_manifests.decode import decode_manifest
from deadline.job_attachments.asset_manifests.base_manifest import BaseAssetManifest
from deadline.job_attachments.caches import HashCache

from .._common import _handle_error, _ProgressBarCallbackManager
from ...exceptions import NonValidInputError
from .._common import _apply_cli_options_to_config, _handle_error, _ProgressBarCallbackManager
from ...exceptions import NonValidInputError, ManifestOutdatedError
from ...config import get_setting, config_file
import boto3
from botocore.client import BaseClient


@click.group(name="asset")
Expand Down Expand Up @@ -103,8 +115,15 @@ def asset_snapshot(root_dir: str, manifest_out: str, recursive: bool, **args):


@cli_asset.command(name="upload")
@click.option("--root-dir", help="The root directory of assets to upload. ")
@click.option(
"--root-dir",
help="The root directory of assets to upload. Defaults to the parent directory of --manifest-dir if not specified. ",
)
@click.option(
"--manifest", help="The path to manifest folder of the directory specified for upload. "
"--manifest-dir",
required=True,
help="The path to manifest folder of the directory specified for upload. ",
)
@click.option("--farm-id", help="The AWS Deadline Cloud Farm to use. ")
@click.option("--queue-id", help="The AWS Deadline Cloud Queue to use. ")
Expand All @@ -116,11 +135,104 @@ def asset_snapshot(root_dir: str, manifest_out: str, recursive: bool, **args):
default=False,
)
@_handle_error
def asset_upload(**args):
def asset_upload(root_dir: str, manifest_dir: str, update: bool, **args):
"""
Uploads the assets in the provided manifest file to S3.
"""
click.echo("upload done")

if not os.path.isdir(manifest_dir):
raise NonValidInputError(f"Specified manifest directory {manifest_dir} does not exist. ")

if root_dir is None:
asset_root_dir = os.path.dirname(manifest_dir)
else:
if not os.path.isdir(root_dir):
raise NonValidInputError(f"Specified root directory {root_dir} does not exist. ")
asset_root_dir = root_dir

config = _apply_cli_options_to_config(required_options={"farm_id", "queue_id"}, **args)
upload_callback_manager: _ProgressBarCallbackManager = _ProgressBarCallbackManager(
length=100, label="Uploading Attachments"
)

deadline: BaseClient = api.get_boto3_client("deadline", config=config)
queue_id: str = get_setting("defaults.queue_id", config=config)
farm_id: str = get_setting("defaults.farm_id", config=config)

queue: dict = deadline.get_queue(
farmId=farm_id,
queueId=queue_id,
)

# assume queue role - session permissions
queue_role_session: boto3.Session = api.get_queue_user_boto3_session(
deadline=deadline,
config=config,
farm_id=farm_id,
queue_id=queue_id,
queue_display_name=queue["displayName"],
)

asset_manager: S3AssetManager = S3AssetManager(
farm_id=farm_id,
queue_id=queue_id,
job_attachment_settings=JobAttachmentS3Settings(**queue["jobAttachmentSettings"]),
session=queue_role_session,
)

asset_uploader: S3AssetUploader = S3AssetUploader()

# read local manifest into BaseAssetManifest object
asset_manifest: BaseAssetManifest = read_local_manifest(manifest=manifest_dir)
clear_S3_mapping(manifest=manifest_dir)

if asset_manifest is None:
raise NonValidInputError(
f"Specified manifest directory {manifest_dir} does contain valid manifest input file. "
)

asset_root_manifest: AssetRootManifest = AssetRootManifest(
root_path=asset_root_dir,
asset_manifest=asset_manifest,
)

manifest_changes: List[tuple] = diff_manifest(
asset_manager=asset_manager,
asset_root_manifest=asset_root_manifest,
manifest=manifest_dir,
update=update,
)

# if there are modified files, will either auto --update manifest or prompt user of file discrepancy
if len(manifest_changes) > 0:
if update:
asset_root_manifest.asset_manifest = update_manifest(
manifest=manifest_dir, new_or_modified_paths=manifest_changes
)
click.echo(f"Manifest information updated: {len(manifest_changes)} files updated. \n")
else:
raise ManifestOutdatedError(
f"Manifest contents in {manifest_dir} are outdated; versioning does not match local files in {asset_root_dir}. Please run with --update to fix current files. \n"
)

attachment_settings: dict = api.upload_attachments(
asset_manager=asset_manager,
manifests=[asset_root_manifest],
print_function_callback=click.echo,
upload_progress_callback=upload_callback_manager.callback,
)

full_manifest_key: str = attachment_settings["manifests"][0]["inputManifestPath"]
manifest_name = os.path.basename(full_manifest_key)
manifest_dir_name = os.path.basename(manifest_dir)
asset_uploader._write_local_manifest_s3_mapping(
manifest_write_dir=asset_root_dir,
manifest_name=manifest_name,
full_manifest_key=full_manifest_key,
manifest_dir_name=manifest_dir_name,
)

click.echo(f"Upload of {asset_root_dir} complete. \n")


@cli_asset.command(name="diff")
Expand Down Expand Up @@ -155,3 +267,145 @@ def asset_download(**args):
Downloads input manifest of previously submitted job.
"""
click.echo("download complete")


def read_local_manifest(manifest: str) -> BaseAssetManifest:
"""
Read manifests specified by filepath to manifest folder, returns BaseAssetManifest Object
"""
input_files = glob.glob(os.path.join(manifest, "*_input"))

if not input_files:
raise ValueError(f"No manifest files found in {manifest}")
elif len(input_files) >= 2:
raise NonValidInputError(
f"Multiple input manifest files are not supported, found: {input_files}."
)

manifest_file_path = input_files[0]

with open(manifest_file_path, "r") as input_file:
manifest_data_str = input_file.read()
asset_manifest = decode_manifest(manifest_data_str)

return asset_manifest


def clear_S3_mapping(manifest: str):
"""
Clears manifest_s3_mapping file contents if it previously exists.
"""
for filename in os.listdir(manifest):
if filename.endswith("manifest_s3_mapping"):
# if S3 mapping already exists, clear contents
filepath = os.path.join(manifest, filename)
with open(filepath, "w") as _:
pass


def diff_manifest(
asset_manager: S3AssetManager,
asset_root_manifest: AssetRootManifest,
manifest: str,
update: bool,
) -> List[tuple]:
"""
Gets the file paths in specified manifest if the contents of file have changed since its last snapshot.
"""
manifest_dir_name = os.path.basename(manifest)
root_path = asset_root_manifest.root_path
input_paths: List[Path] = []

asset_manifest = asset_root_manifest.asset_manifest
if asset_manifest is None:
raise NonValidInputError("Manifest object not found, please check input manifest. ")

for base_manifest_path in asset_manifest.paths:
if base_manifest_path.path.startswith(manifest_dir_name):
# skip the manifest folder, or else every upload will need an update after a previous change
continue
input_paths.append(Path(root_path, base_manifest_path.path))

return find_file_with_status(
asset_manager=asset_manager,
input_paths=input_paths,
root_path=root_path,
update=update,
statuses=[FileStatus.NEW, FileStatus.MODIFIED],
)


def find_file_with_status(
asset_manager: S3AssetManager,
input_paths: List[Path],
root_path: str,
update: bool,
statuses: List[FileStatus],
) -> List[tuple]:
"""
Checks a manifest file, compares it to specified root directory or manifest of files with the local hash cache, and finds files that match the specified statuses.
Returns a list of tuples containing the file information, and its corresponding file status.
"""
cache_config = config_file.get_cache_directory()

with HashCache(cache_config) as hash_cache:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(
asset_manager._process_input_path,
path=path,
root_path=root_path,
hash_cache=hash_cache,
update=update,
): path
for path in input_paths
}
status_paths: List[tuple] = []
for future in concurrent.futures.as_completed(futures):
(file_status, _, manifestPath) = future.result()
if file_status in statuses:
status_paths.append((file_status, manifestPath))

return status_paths


def update_manifest(manifest: str, new_or_modified_paths: List[tuple]) -> BaseAssetManifest:
"""
Updates the local manifest file to reflect modified or new files
"""
input_files = glob.glob(os.path.join(manifest, "*_input"))

if not input_files:
raise ValueError(f"No manifest files found in {manifest}")
elif len(input_files) >= 2:
raise NonValidInputError(
f"Multiple input manifest files are not supported, found: {input_files}."
)

manifest_file_path = input_files[0]

with open(manifest_file_path, "r") as manifest_file:
manifest_data_str = manifest_file.read()
local_base_asset_manifest = decode_manifest(manifest_data_str)

# maps paths of local to optimize updating of manifest entries
manifest_info_dict = {
base_manifest_path.path: base_manifest_path
for base_manifest_path in local_base_asset_manifest.paths
}

for _, base_asset_manifest in new_or_modified_paths:
if base_asset_manifest.path in manifest_info_dict:
# Update the hash_value of the existing object
manifest_info_dict[base_asset_manifest.path].hash = base_asset_manifest.hash
else:
# Add the new object if it doesn't exist
manifest_info_dict[base_asset_manifest.path] = base_asset_manifest

# write to local manifest
updated_path_list = list(manifest_info_dict.values())
local_base_asset_manifest.paths = updated_path_list
with open(manifest_file_path, "w") as manifest_file:
manifest_file.write(local_base_asset_manifest.encode())

return local_base_asset_manifest
4 changes: 4 additions & 0 deletions src/deadline/client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ class UserInitiatedCancel(Exception):

class NonValidInputError(Exception):
"""Error for when the user input is nonvalid"""


class ManifestOutdatedError(Exception):
"""Error for when local files are different from version captured in manifest"""
Loading

0 comments on commit 5854423

Please sign in to comment.