Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(asset-cli): asset upload subcommand to upload assets to S3 #392

Merged
merged 1 commit into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/deadline/client/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"logout",
"create_job_from_job_bundle",
"hash_attachments",
"upload_attachments",
"wait_for_create_job_to_complete",
"get_boto3_session",
"get_boto3_client",
Expand Down Expand Up @@ -57,6 +58,7 @@
create_job_from_job_bundle,
wait_for_create_job_to_complete,
hash_attachments,
upload_attachments,
)
from ._telemetry import (
get_telemetry_client,
Expand Down
4 changes: 2 additions & 2 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def create_job_from_job_bundle(
hashing_progress_callback=hashing_progress_callback,
)

attachment_settings = _upload_attachments(
attachment_settings = upload_attachments(
asset_manager, asset_manifests, print_function_callback, upload_progress_callback
)
attachment_settings["fileSystem"] = JobAttachmentsFileSystem(
Expand Down Expand Up @@ -432,7 +432,7 @@ def _default_update_hash_progress(hashing_metadata: Dict[str, str]) -> bool:
return hashing_summary, manifests


def _upload_attachments(
def upload_attachments(
asset_manager: S3AssetManager,
manifests: List[AssetRootManifest],
print_function_callback: Callable = lambda msg: None,
Expand Down
267 changes: 260 additions & 7 deletions src/deadline/client/cli/_groups/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,27 @@
"""
import os
from pathlib import Path
import concurrent.futures
from typing import List
import glob

import click

from deadline.client import api
from deadline.job_attachments.upload import S3AssetManager, S3AssetUploader
from deadline.job_attachments.models import JobAttachmentS3Settings
from deadline.job_attachments.upload import FileStatus, S3AssetManager, S3AssetUploader
from deadline.job_attachments.models import (
JobAttachmentS3Settings,
AssetRootManifest,
)
from deadline.job_attachments.asset_manifests.decode import decode_manifest
from deadline.job_attachments.asset_manifests.base_manifest import BaseAssetManifest
from deadline.job_attachments.caches import HashCache

from .._common import _handle_error, _ProgressBarCallbackManager
from ...exceptions import NonValidInputError
from .._common import _apply_cli_options_to_config, _handle_error, _ProgressBarCallbackManager
from ...exceptions import NonValidInputError, ManifestOutdatedError
from ...config import get_setting, config_file
import boto3
from botocore.client import BaseClient


@click.group(name="asset")
Expand Down Expand Up @@ -104,7 +116,13 @@ def asset_snapshot(root_dir: str, manifest_out: str, recursive: bool, **args):

@cli_asset.command(name="upload")
@click.option(
"--manifest", help="The path to manifest folder of the directory specified for upload. "
"--root-dir",
help="The root directory of assets to upload. Defaults to the parent directory of --manifest-dir if not specified. ",
)
@click.option(
"--manifest-dir",
required=True,
help="The path to manifest folder of the directory specified for upload. ",
)
@click.option("--farm-id", help="The AWS Deadline Cloud Farm to use. ")
@click.option("--queue-id", help="The AWS Deadline Cloud Queue to use. ")
Expand All @@ -116,11 +134,104 @@ def asset_snapshot(root_dir: str, manifest_out: str, recursive: bool, **args):
default=False,
)
@_handle_error
def asset_upload(**args):
def asset_upload(root_dir: str, manifest_dir: str, update: bool, **args):
"""
Uploads the assets in the provided manifest file to S3.
"""
click.echo("upload done")

if not os.path.isdir(manifest_dir):
raise NonValidInputError(f"Specified manifest directory {manifest_dir} does not exist. ")

if root_dir is None:
asset_root_dir = os.path.dirname(manifest_dir)
else:
if not os.path.isdir(root_dir):
raise NonValidInputError(f"Specified root directory {root_dir} does not exist. ")
jericht marked this conversation as resolved.
Show resolved Hide resolved
asset_root_dir = root_dir

config = _apply_cli_options_to_config(required_options={"farm_id", "queue_id"}, **args)
upload_callback_manager: _ProgressBarCallbackManager = _ProgressBarCallbackManager(
length=100, label="Uploading Attachments"
)

deadline: BaseClient = api.get_boto3_client("deadline", config=config)
queue_id: str = get_setting("defaults.queue_id", config=config)
farm_id: str = get_setting("defaults.farm_id", config=config)

queue: dict = deadline.get_queue(
farmId=farm_id,
queueId=queue_id,
)

# assume queue role - session permissions
queue_role_session: boto3.Session = api.get_queue_user_boto3_session(
deadline=deadline,
config=config,
farm_id=farm_id,
queue_id=queue_id,
queue_display_name=queue["displayName"],
)

asset_manager: S3AssetManager = S3AssetManager(
farm_id=farm_id,
queue_id=queue_id,
job_attachment_settings=JobAttachmentS3Settings(**queue["jobAttachmentSettings"]),
session=queue_role_session,
)

asset_uploader: S3AssetUploader = S3AssetUploader()

# read local manifest into BaseAssetManifest object
asset_manifest: BaseAssetManifest = read_local_manifest(manifest=manifest_dir)
clear_S3_mapping(manifest=manifest_dir)

if asset_manifest is None:
raise NonValidInputError(
f"Specified manifest directory {manifest_dir} does contain valid manifest input file. "
)

asset_root_manifest: AssetRootManifest = AssetRootManifest(
root_path=asset_root_dir,
asset_manifest=asset_manifest,
)

manifest_changes: List[tuple] = diff_manifest(
asset_manager=asset_manager,
asset_root_manifest=asset_root_manifest,
manifest=manifest_dir,
update=update,
)

# if there are modified files, will either auto --update manifest or prompt user of file discrepancy
if len(manifest_changes) > 0:
if update:
asset_root_manifest.asset_manifest = update_manifest(
manifest=manifest_dir, new_or_modified_paths=manifest_changes
)
click.echo(f"Manifest information updated: {len(manifest_changes)} files updated. \n")
else:
raise ManifestOutdatedError(
f"Manifest contents in {manifest_dir} are outdated; versioning does not match local files in {asset_root_dir}. Please run with --update to fix current files. \n"
)

attachment_settings: dict = api.upload_attachments(
asset_manager=asset_manager,
manifests=[asset_root_manifest],
print_function_callback=click.echo,
upload_progress_callback=upload_callback_manager.callback,
)

full_manifest_key: str = attachment_settings["manifests"][0]["inputManifestPath"]
manifest_name = os.path.basename(full_manifest_key)
manifest_dir_name = os.path.basename(manifest_dir)
asset_uploader._write_local_manifest_s3_mapping(
manifest_write_dir=asset_root_dir,
manifest_name=manifest_name,
full_manifest_key=full_manifest_key,
manifest_dir_name=manifest_dir_name,
)

click.echo(f"Upload of {asset_root_dir} complete. \n")


@cli_asset.command(name="diff")
Expand Down Expand Up @@ -155,3 +266,145 @@ def asset_download(**args):
Downloads input manifest of previously submitted job.
"""
click.echo("download complete")


def read_local_manifest(manifest: str) -> BaseAssetManifest:
"""
Read manifests specified by filepath to manifest folder, returns BaseAssetManifest Object
"""
input_files = glob.glob(os.path.join(manifest, "*_input"))

if not input_files:
raise ValueError(f"No manifest files found in {manifest}")
elif len(input_files) >= 2:
raise NonValidInputError(
f"Multiple input manifest files are not supported, found: {input_files}."
)

manifest_file_path = input_files[0]

with open(manifest_file_path, "r") as input_file:
manifest_data_str = input_file.read()
asset_manifest = decode_manifest(manifest_data_str)

return asset_manifest
Comment on lines +286 to +290
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be address in a later code cleanup pass, but we don't need to decode the manifest inside the with block here.



def clear_S3_mapping(manifest: str):
"""
Clears manifest_s3_mapping file contents if it previously exists.
"""
for filename in os.listdir(manifest):
if filename.endswith("manifest_s3_mapping"):
# if S3 mapping already exists, clear contents
filepath = os.path.join(manifest, filename)
with open(filepath, "w") as _:
pass


def diff_manifest(
asset_manager: S3AssetManager,
asset_root_manifest: AssetRootManifest,
manifest: str,
update: bool,
) -> List[tuple]:
"""
Gets the file paths in specified manifest if the contents of file have changed since its last snapshot.
"""
manifest_dir_name = os.path.basename(manifest)
root_path = asset_root_manifest.root_path
input_paths: List[Path] = []

asset_manifest = asset_root_manifest.asset_manifest
if asset_manifest is None:
raise NonValidInputError("Manifest object not found, please check input manifest. ")

for base_manifest_path in asset_manifest.paths:
if base_manifest_path.path.startswith(manifest_dir_name):
# skip the manifest folder, or else every upload will need an update after a previous change
continue
input_paths.append(Path(root_path, base_manifest_path.path))

return find_file_with_status(
asset_manager=asset_manager,
input_paths=input_paths,
root_path=root_path,
update=update,
statuses=[FileStatus.NEW, FileStatus.MODIFIED],
)


def find_file_with_status(
asset_manager: S3AssetManager,
input_paths: List[Path],
root_path: str,
update: bool,
statuses: List[FileStatus],
) -> List[tuple]:
"""
Checks a manifest file, compares it to specified root directory or manifest of files with the local hash cache, and finds files that match the specified statuses.
Returns a list of tuples containing the file information, and its corresponding file status.
"""
cache_config = config_file.get_cache_directory()

with HashCache(cache_config) as hash_cache:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(
asset_manager._process_input_path,
path=path,
root_path=root_path,
hash_cache=hash_cache,
update=update,
): path
for path in input_paths
}
status_paths: List[tuple] = []
for future in concurrent.futures.as_completed(futures):
(file_status, _, manifestPath) = future.result()
if file_status in statuses:
status_paths.append((file_status, manifestPath))

return status_paths


def update_manifest(manifest: str, new_or_modified_paths: List[tuple]) -> BaseAssetManifest:
"""
Updates the local manifest file to reflect modified or new files
"""
input_files = glob.glob(os.path.join(manifest, "*_input"))

if not input_files:
raise ValueError(f"No manifest files found in {manifest}")
elif len(input_files) >= 2:
raise NonValidInputError(
f"Multiple input manifest files are not supported, found: {input_files}."
)

manifest_file_path = input_files[0]

with open(manifest_file_path, "r") as manifest_file:
manifest_data_str = manifest_file.read()
local_base_asset_manifest = decode_manifest(manifest_data_str)

# maps paths of local to optimize updating of manifest entries
manifest_info_dict = {
base_manifest_path.path: base_manifest_path
for base_manifest_path in local_base_asset_manifest.paths
}

for _, base_asset_manifest in new_or_modified_paths:
if base_asset_manifest.path in manifest_info_dict:
# Update the hash_value of the existing object
manifest_info_dict[base_asset_manifest.path].hash = base_asset_manifest.hash
else:
# Add the new object if it doesn't exist
manifest_info_dict[base_asset_manifest.path] = base_asset_manifest

# write to local manifest
updated_path_list = list(manifest_info_dict.values())
local_base_asset_manifest.paths = updated_path_list
with open(manifest_file_path, "w") as manifest_file:
manifest_file.write(local_base_asset_manifest.encode())

return local_base_asset_manifest
4 changes: 4 additions & 0 deletions src/deadline/client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ class UserInitiatedCancel(Exception):

class NonValidInputError(Exception):
"""Error for when the user input is nonvalid"""


class ManifestOutdatedError(Exception):
"""Error for when local files are different from version captured in manifest"""
Loading
Loading