Skip to content

Commit

Permalink
feat(asset-cli): asset upload subcommand
Browse files Browse the repository at this point in the history
Signed-off-by: Tang <[email protected]>
  • Loading branch information
stangch committed Jul 5, 2024
1 parent 2d5f2d4 commit a761995
Show file tree
Hide file tree
Showing 14 changed files with 1,207 additions and 80 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/code_quality.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Code Quality

on:
pull_request:
branches: [ mainline, release ]
branches: [ mainline, release, feature_assets_cli ]
workflow_call:
inputs:
branch:
Expand Down
19 changes: 18 additions & 1 deletion src/deadline/client/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
"login",
"logout",
"create_job_from_job_bundle",
"hash_attachments",
"upload_attachments",
<<<<<<< HEAD
=======
>>>>>>> a954d40 (feat(asset-cli): asset snapshot subcommand to capture manifests (#369))
=======
>>>>>>> d959a6f (feat(asset-cli): asset upload subcommand to upload assets to S3)
"wait_for_create_job_to_complete",
"get_boto3_session",
"get_boto3_client",
Expand Down Expand Up @@ -52,7 +59,17 @@
list_storage_profiles_for_queue,
)
from ._queue_parameters import get_queue_parameter_definitions
from ._submit_job_bundle import create_job_from_job_bundle, wait_for_create_job_to_complete
from ._submit_job_bundle import (
create_job_from_job_bundle,
wait_for_create_job_to_complete,
hash_attachments,
upload_attachments,
<<<<<<< HEAD
=======
>>>>>>> a954d40 (feat(asset-cli): asset snapshot subcommand to capture manifests (#369))
=======
>>>>>>> d959a6f (feat(asset-cli): asset upload subcommand to upload assets to S3)
)
from ._telemetry import (
get_telemetry_client,
get_deadline_cloud_library_telemetry_client,
Expand Down
8 changes: 4 additions & 4 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def create_job_from_job_bundle(
print_function_callback("Job submission canceled.")
return None

_, asset_manifests = _hash_attachments(
_, asset_manifests = hash_attachments(
asset_manager=asset_manager,
asset_groups=upload_group.asset_groups,
total_input_files=upload_group.total_input_files,
Expand All @@ -285,7 +285,7 @@ def create_job_from_job_bundle(
hashing_progress_callback=hashing_progress_callback,
)

attachment_settings = _upload_attachments(
attachment_settings = upload_attachments(
asset_manager, asset_manifests, print_function_callback, upload_progress_callback
)
attachment_settings["fileSystem"] = JobAttachmentsFileSystem(
Expand Down Expand Up @@ -396,7 +396,7 @@ def wait_for_create_job_to_complete(
)


def _hash_attachments(
def hash_attachments(
asset_manager: S3AssetManager,
asset_groups: list[AssetRootGroup],
total_input_files: int,
Expand Down Expand Up @@ -432,7 +432,7 @@ def _default_update_hash_progress(hashing_metadata: Dict[str, str]) -> bool:
return hashing_summary, manifests


def _upload_attachments(
def upload_attachments(
asset_manager: S3AssetManager,
manifests: List[AssetRootManifest],
print_function_callback: Callable = lambda msg: None,
Expand Down
46 changes: 46 additions & 0 deletions src/deadline/client/cli/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@
"_handle_error",
"_apply_cli_options_to_config",
"_cli_object_repr",
"_ProgressBarCallbackManager",
]

import sys
from configparser import ConfigParser
from typing import Any, Callable, Optional, Set

import click
from contextlib import ExitStack
from deadline.job_attachments.progress_tracker import ProgressReportMetadata

from ..config import config_file
from ..exceptions import DeadlineOperationError
from ..job_bundle import deadline_yaml_dump
from ._groups._sigint_handler import SigIntHandler

_PROMPT_WHEN_COMPLETE = "PROMPT_WHEN_COMPLETE"

# Set up the signal handler for handling Ctrl + C interruptions.
sigint_handler = SigIntHandler()


def _prompt_at_completion(ctx: click.Context):
"""
Expand Down Expand Up @@ -176,3 +183,42 @@ def _cli_object_repr(obj: Any):
# strings to end with "\n".
obj = _fix_multiline_strings(obj)
return deadline_yaml_dump(obj)


class _ProgressBarCallbackManager:
"""
Manages creation, update, and deletion of a progress bar. On first call of the callback, the progress bar is created. The progress bar is closed
on the final call (100% completion)
"""

BAR_NOT_CREATED = 0
BAR_CREATED = 1
BAR_CLOSED = 2

def __init__(self, length: int, label: str):
self._length = length
self._label = label
self._bar_status = self.BAR_NOT_CREATED
self._exit_stack = ExitStack()

def callback(self, upload_metadata: ProgressReportMetadata) -> bool:
if self._bar_status == self.BAR_CLOSED:
# from multithreaded execution this can be called after completion somtimes.
return sigint_handler.continue_operation
elif self._bar_status == self.BAR_NOT_CREATED:
# Note: click doesn't export the return type of progressbar(), so we suppress mypy warnings for
# not annotating the type of hashing_progress.
self._upload_progress = click.progressbar(length=self._length, label=self._label) # type: ignore[var-annotated]
self._exit_stack.enter_context(self._upload_progress)
self._bar_status = self.BAR_CREATED

total_progress = int(upload_metadata.progress)
new_progress = total_progress - self._upload_progress.pos
if new_progress > 0:
self._upload_progress.update(new_progress)

if total_progress == self._length or not sigint_handler.continue_operation:
self._bar_status = self.BAR_CLOSED
self._exit_stack.close()

return sigint_handler.continue_operation
2 changes: 2 additions & 0 deletions src/deadline/client/cli/_deadline_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ._groups.job_group import cli_job
from ._groups.queue_group import cli_queue
from ._groups.worker_group import cli_worker
from ._groups.asset_group import cli_asset

logger = getLogger(__name__)

Expand Down Expand Up @@ -76,3 +77,4 @@ def main(ctx: click.Context, log_level: str):
main.add_command(cli_job)
main.add_command(cli_queue)
main.add_command(cli_worker)
main.add_command(cli_asset)
Loading

0 comments on commit a761995

Please sign in to comment.