Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(asset-cli): asset snapshot subcommand to capture manifests #369

Merged
merged 12 commits into from
Jul 4, 2024
7 changes: 6 additions & 1 deletion src/deadline/client/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"login",
"logout",
"create_job_from_job_bundle",
"hash_attachments",
"wait_for_create_job_to_complete",
"get_boto3_session",
"get_boto3_client",
Expand Down Expand Up @@ -52,7 +53,11 @@
list_storage_profiles_for_queue,
)
from ._queue_parameters import get_queue_parameter_definitions
from ._submit_job_bundle import create_job_from_job_bundle, wait_for_create_job_to_complete
from ._submit_job_bundle import (
create_job_from_job_bundle,
wait_for_create_job_to_complete,
hash_attachments,
)
from ._telemetry import (
get_telemetry_client,
get_deadline_cloud_library_telemetry_client,
Expand Down
4 changes: 2 additions & 2 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def create_job_from_job_bundle(
print_function_callback("Job submission canceled.")
return None

_, asset_manifests = _hash_attachments(
_, asset_manifests = hash_attachments(
asset_manager=asset_manager,
asset_groups=upload_group.asset_groups,
total_input_files=upload_group.total_input_files,
Expand Down Expand Up @@ -396,7 +396,7 @@ def wait_for_create_job_to_complete(
)


def _hash_attachments(
def hash_attachments(
stangch marked this conversation as resolved.
Show resolved Hide resolved
asset_manager: S3AssetManager,
asset_groups: list[AssetRootGroup],
total_input_files: int,
Expand Down
46 changes: 46 additions & 0 deletions src/deadline/client/cli/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@
"_handle_error",
"_apply_cli_options_to_config",
"_cli_object_repr",
"_ProgressBarCallbackManager",
]

import sys
from configparser import ConfigParser
from typing import Any, Callable, Optional, Set

import click
from contextlib import ExitStack
from deadline.job_attachments.progress_tracker import ProgressReportMetadata

from ..config import config_file
from ..exceptions import DeadlineOperationError
from ..job_bundle import deadline_yaml_dump
from ._groups._sigint_handler import SigIntHandler

_PROMPT_WHEN_COMPLETE = "PROMPT_WHEN_COMPLETE"

# Set up the signal handler for handling Ctrl + C interruptions.
sigint_handler = SigIntHandler()


def _prompt_at_completion(ctx: click.Context):
"""
Expand Down Expand Up @@ -176,3 +183,42 @@ def _cli_object_repr(obj: Any):
# strings to end with "\n".
obj = _fix_multiline_strings(obj)
return deadline_yaml_dump(obj)


class _ProgressBarCallbackManager:
"""
Manages creation, update, and deletion of a progress bar. On first call of the callback, the progress bar is created. The progress bar is closed
on the final call (100% completion)
"""

BAR_NOT_CREATED = 0
BAR_CREATED = 1
BAR_CLOSED = 2

def __init__(self, length: int, label: str):
self._length = length
self._label = label
self._bar_status = self.BAR_NOT_CREATED
self._exit_stack = ExitStack()

def callback(self, upload_metadata: ProgressReportMetadata) -> bool:
if self._bar_status == self.BAR_CLOSED:
# from multithreaded execution this can be called after completion somtimes.
return sigint_handler.continue_operation
elif self._bar_status == self.BAR_NOT_CREATED:
# Note: click doesn't export the return type of progressbar(), so we suppress mypy warnings for
# not annotating the type of hashing_progress.
self._upload_progress = click.progressbar(length=self._length, label=self._label) # type: ignore[var-annotated]
self._exit_stack.enter_context(self._upload_progress)
self._bar_status = self.BAR_CREATED

total_progress = int(upload_metadata.progress)
new_progress = total_progress - self._upload_progress.pos
if new_progress > 0:
self._upload_progress.update(new_progress)

if total_progress == self._length or not sigint_handler.continue_operation:
self._bar_status = self.BAR_CLOSED
self._exit_stack.close()

return sigint_handler.continue_operation
stangch marked this conversation as resolved.
Show resolved Hide resolved
72 changes: 67 additions & 5 deletions src/deadline/client/cli/_groups/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@
* diff
* download
"""
import os
from pathlib import Path

import click

from .._common import _handle_error
from deadline.client import api
from deadline.job_attachments.upload import S3AssetManager, S3AssetUploader
from deadline.job_attachments.models import JobAttachmentS3Settings

from .._common import _handle_error, _ProgressBarCallbackManager
from ...exceptions import NonValidInputError


@click.group(name="asset")
Expand All @@ -22,8 +29,10 @@ def cli_asset():


@cli_asset.command(name="snapshot")
@click.option("--root-dir", help="The root directory to snapshot. ")
@click.option("--manifest-out", help="Destination path to directory where manifest is created. ")
@click.option("--root-dir", required=True, help="The root directory to snapshot. ")
@click.option(
"--manifest-out", default=None, help="Destination path to directory where manifest is created. "
)
@click.option(
"--recursive",
"-r",
Expand All @@ -33,11 +42,64 @@ def cli_asset():
default=False,
)
@_handle_error
def asset_snapshot(**args):
def asset_snapshot(root_dir: str, manifest_out: str, recursive: bool, **args):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for the type annotation: Is manifest_out always a str? Is it a different type when the user does not provide --manifest-out?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the click library doesn't allow users to specify optional in the function params, rather in the above @click.options, so I added a default=None for --manifest-out

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so the typing should say it can either be None or str right?

"""
Creates manifest of files specified root directory.
"""
click.echo("snapshot taken")
if not os.path.isdir(root_dir):
raise NonValidInputError(f"Specified root directory {root_dir} does not exist. ")

if manifest_out and not os.path.isdir(manifest_out):
raise NonValidInputError(f"Specified destination directory {manifest_out} does not exist. ")
elif manifest_out is None:
manifest_out = root_dir
godobyte marked this conversation as resolved.
Show resolved Hide resolved
click.echo(f"Manifest creation path defaulted to {root_dir} \n")

inputs = []
for root, dirs, files in os.walk(root_dir):
inputs.extend([str(os.path.join(root, file)) for file in files])
if not recursive:
break

# Placeholder Asset Manager
asset_manager = S3AssetManager(
farm_id=" ", queue_id=" ", job_attachment_settings=JobAttachmentS3Settings(" ", " ")
)
stangch marked this conversation as resolved.
Show resolved Hide resolved
asset_uploader = S3AssetUploader()
hash_callback_manager = _ProgressBarCallbackManager(length=100, label="Hashing Attachments")

upload_group = asset_manager.prepare_paths_for_upload(
input_paths=inputs, output_paths=[root_dir], referenced_paths=[]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious - what is this referenced_paths?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some comments on this page: https://github.com/ddneilson/deadline-cloud-samples/blob/the_job_bundle/job_bundles/README.md

tldr: it's neither input nor output, but in case job references the name of the folder; in the case of snapshot + upload, the main goal is to upload files we care about, so I don't think referenced_paths are relevant here.

)
if upload_group.asset_groups:
_, manifests = api.hash_attachments(
asset_manager=asset_manager,
asset_groups=upload_group.asset_groups,
total_input_files=upload_group.total_input_files,
total_input_bytes=upload_group.total_input_bytes,
print_function_callback=click.echo,
hashing_progress_callback=hash_callback_manager.callback,
)

# Write created manifest into local file, at the specified location at manifest_out
for asset_root_manifests in manifests:
if asset_root_manifests.asset_manifest is None:
continue
source_root = Path(asset_root_manifests.root_path)
file_system_location_name = asset_root_manifests.file_system_location_name
(_, _, manifest_name) = asset_uploader._gather_upload_metadata(
manifest=asset_root_manifests.asset_manifest,
source_root=source_root,
file_system_location_name=file_system_location_name,
)
asset_uploader._write_local_input_manifest(
manifest_write_dir=manifest_out,
manifest_name=manifest_name,
manifest=asset_root_manifests.asset_manifest,
root_dir_name=os.path.basename(root_dir),
)

click.echo(f"Manifest created at {manifest_out}\n")


@cli_asset.command(name="upload")
Expand Down
43 changes: 1 addition & 42 deletions src/deadline/client/cli/_groups/bundle_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import re

import click
from contextlib import ExitStack
from botocore.exceptions import ClientError

from deadline.client import api
Expand All @@ -20,11 +19,10 @@
MisconfiguredInputsError,
)
from deadline.job_attachments.models import AssetUploadGroup, JobAttachmentsFileSystem
from deadline.job_attachments.progress_tracker import ProgressReportMetadata
from deadline.job_attachments._utils import _human_readable_file_size

from ...exceptions import DeadlineOperationError, CreateJobWaiterCanceled
from .._common import _apply_cli_options_to_config, _handle_error
from .._common import _apply_cli_options_to_config, _handle_error, _ProgressBarCallbackManager
from ._sigint_handler import SigIntHandler

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -282,42 +280,3 @@ def bundle_gui_submit(job_bundle_dir, browse, **args):
click.echo(f"Job ID: {response['jobId']}")
else:
click.echo("Job submission canceled.")


class _ProgressBarCallbackManager:
"""
Manages creation, update, and deletion of a progress bar. On first call of the callback, the progress bar is created. The progress bar is closed
on the final call (100% completion)
"""

BAR_NOT_CREATED = 0
BAR_CREATED = 1
BAR_CLOSED = 2

def __init__(self, length: int, label: str):
self._length = length
self._label = label
self._bar_status = self.BAR_NOT_CREATED
self._exit_stack = ExitStack()

def callback(self, upload_metadata: ProgressReportMetadata) -> bool:
if self._bar_status == self.BAR_CLOSED:
# from multithreaded execution this can be called after completion somtimes.
return sigint_handler.continue_operation
elif self._bar_status == self.BAR_NOT_CREATED:
# Note: click doesn't export the return type of progressbar(), so we suppress mypy warnings for
# not annotating the type of hashing_progress.
self._upload_progress = click.progressbar(length=self._length, label=self._label) # type: ignore[var-annotated]
self._exit_stack.enter_context(self._upload_progress)
self._bar_status = self.BAR_CREATED

total_progress = int(upload_metadata.progress)
new_progress = total_progress - self._upload_progress.pos
if new_progress > 0:
self._upload_progress.update(new_progress)

if total_progress == self._length or not sigint_handler.continue_operation:
self._bar_status = self.BAR_CLOSED
self._exit_stack.close()

return sigint_handler.continue_operation
58 changes: 50 additions & 8 deletions src/deadline/job_attachments/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,9 @@ def upload_assets(
"""

# Upload asset manifest
hash_alg = manifest.get_default_hash_alg()
manifest_bytes = manifest.encode().encode("utf-8")
manifest_name_prefix = hash_data(
f"{file_system_location_name or ''}{str(source_root)}".encode(), hash_alg
(hash_alg, manifest_bytes, manifest_name) = self._gather_upload_metadata(
manifest, source_root, file_system_location_name
)
manifest_name = f"{manifest_name_prefix}_input"

if partial_manifest_prefix:
partial_manifest_key = _join_s3_paths(partial_manifest_prefix, manifest_name)
Expand Down Expand Up @@ -203,25 +200,70 @@ def upload_assets(

return (partial_manifest_key, hash_data(manifest_bytes, hash_alg))

def _gather_upload_metadata(
self,
manifest: BaseAssetManifest,
source_root: Path,
file_system_location_name: Optional[str] = None,
) -> tuple[HashAlgorithm, bytes, str]:
"""
Gathers metadata information of manifest to be used for writing the local manifest
"""
hash_alg = manifest.get_default_hash_alg()
manifest_bytes = manifest.encode().encode("utf-8")
manifest_name_prefix = hash_data(
f"{file_system_location_name or ''}{str(source_root)}".encode(), hash_alg
)
manifest_name = f"{manifest_name_prefix}_input"

return (hash_alg, manifest_bytes, manifest_name)

def _write_local_manifest(
self,
manifest_write_dir: str,
manifest_name: str,
full_manifest_key: str,
manifest: BaseAssetManifest,
root_dir_name: Optional[str] = None,
) -> None:
"""
Writes a manifest file locally in a 'manifests' sub-directory.
Also creates/appends to a file mapping the local manifest name to the full S3 key in the same directory.
"""
local_manifest_file = Path(manifest_write_dir, "manifests", manifest_name)
self._write_local_input_manifest(manifest_write_dir, manifest_name, manifest, root_dir_name)

self._write_local_manifest_s3_mapping(manifest_write_dir, manifest_name, full_manifest_key)

def _write_local_input_manifest(
self,
manifest_write_dir: str,
manifest_name: str,
manifest: BaseAssetManifest,
root_dir_name: Optional[str],
):
"""
Creates 'manifests' sub-directory and writes a local input manifest file
"""
input_manifest_folder_name = "manifests"
if root_dir_name is not None:
input_manifest_folder_name = root_dir_name + "_" + input_manifest_folder_name

local_manifest_file = Path(manifest_write_dir, input_manifest_folder_name, manifest_name)
logger.info(f"Creating local manifest file: {local_manifest_file}\n")
local_manifest_file.parent.mkdir(parents=True, exist_ok=True)
with open(local_manifest_file, "w") as file:
file.write(manifest.encode())

# Create or append to an existing mapping file. We use this since path lengths can go beyond the
# file name length limit on Windows if we were to create the full S3 key path locally.
def _write_local_manifest_s3_mapping(
self,
manifest_write_dir: str,
manifest_name: str,
full_manifest_key: str,
):
"""
Create or append to an existing mapping file. We use this since path lengths can go beyond the
file name length limit on Windows if we were to create the full S3 key path locally.
"""
manifest_map_file = Path(manifest_write_dir, "manifests", "manifest_s3_mapping")
mapping = {"local_file": manifest_name, "s3_key": full_manifest_key}
with open(manifest_map_file, "a") as mapping_file:
Expand Down
Loading
Loading