Skip to content

Commit

Permalink
fix!: always prompt user about misconfigured inputs (#309)
Browse files Browse the repository at this point in the history
* fix!: always prompt user about misconfigured inputs

Signed-off-by: Morgan Epp <[email protected]>

* fix!: Add empty paths to referenced paths

Signed-off-by: Caden Marofke <[email protected]>

---------

Signed-off-by: Morgan Epp <[email protected]>
Signed-off-by: Caden Marofke <[email protected]>
Co-authored-by: Morgan Epp <[email protected]>
  • Loading branch information
marofke and epmog authored Apr 25, 2024
1 parent 6f9639c commit f8d5826
Show file tree
Hide file tree
Showing 14 changed files with 546 additions and 128 deletions.
44 changes: 36 additions & 8 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
JobParameter,
)
from ..job_bundle.submission import AssetReferences, split_parameter_args
from ...job_attachments.exceptions import MisconfiguredInputsError
from ...job_attachments.models import (
JobAttachmentsFileSystem,
AssetRootGroup,
AssetRootManifest,
AssetUploadGroup,
JobAttachmentS3Settings,
)
from ...job_attachments.progress_tracker import SummaryStatistics, ProgressReportMetadata
Expand All @@ -57,11 +59,12 @@ def create_job_from_job_bundle(
max_retries_per_task: Optional[int] = None,
print_function_callback: Callable[[str], None] = lambda msg: None,
decide_cancel_submission_callback: Callable[
[dict[str, int], int, int], bool
] = lambda paths, files, bytes: False,
[AssetUploadGroup], bool
] = lambda upload_group: False,
hashing_progress_callback: Optional[Callable[[ProgressReportMetadata], bool]] = None,
upload_progress_callback: Optional[Callable[[ProgressReportMetadata], bool]] = None,
create_job_result_callback: Optional[Callable[[], bool]] = None,
require_paths_exist: bool = False,
) -> Union[str, None]:
"""
Creates a job in the AWS Deadline Cloud farm/queue configured as default for the
Expand Down Expand Up @@ -210,13 +213,42 @@ def create_job_from_job_bundle(
# Hash and upload job attachments if there are any
if asset_references and "jobAttachmentSettings" in queue:
# Extend input_filenames with all the files in the input_directories
missing_directories: set[str] = set()
for directory in asset_references.input_directories:
if not os.path.isdir(directory):
if require_paths_exist:
missing_directories.add(directory)
else:
logger.warning(
f"Input path '{directory}' does not exist. Adding to referenced paths."
)
asset_references.referenced_paths.add(directory)
continue

is_dir_empty = True
for root, _, files in os.walk(directory):
if not files:
continue
is_dir_empty = False
asset_references.input_filenames.update(
os.path.normpath(os.path.join(root, file)) for file in files
)
# Empty directories just become references since there's nothing to upload
if is_dir_empty:
logger.info(f"Input directory '{directory}' is empty. Adding to referenced paths.")
asset_references.referenced_paths.add(directory)
asset_references.input_directories.clear()

if missing_directories:
all_missing_directories = "\n\t".join(sorted(list(missing_directories)))
misconfigured_directories_msg = (
"Job submission contains misconfigured input directories and cannot be submitted."
" All input directories must exist."
f"\nNon-existent directories:\n\t{all_missing_directories}"
)

raise MisconfiguredInputsError(misconfigured_directories_msg)

queue_role_session = api.get_queue_user_boto3_session(
deadline=deadline,
config=config,
Expand All @@ -233,18 +265,14 @@ def create_job_from_job_bundle(
)

upload_group = asset_manager.prepare_paths_for_upload(
job_bundle_path=job_bundle_dir,
input_paths=sorted(asset_references.input_filenames),
output_paths=sorted(asset_references.output_directories),
referenced_paths=sorted(asset_references.referenced_paths),
storage_profile=storage_profile,
require_paths_exist=require_paths_exist,
)
if upload_group.asset_groups:
if decide_cancel_submission_callback(
upload_group.num_outside_files_by_root,
upload_group.total_input_files,
upload_group.total_input_bytes,
):
if decide_cancel_submission_callback(upload_group):
print_function_callback("Job submission canceled.")
return None

Expand Down
86 changes: 60 additions & 26 deletions src/deadline/client/cli/_groups/bundle_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@

from deadline.client import api
from deadline.client.config import config_file, get_setting, set_setting
from deadline.job_attachments.exceptions import AssetSyncError, AssetSyncCancelledError
from deadline.job_attachments.models import JobAttachmentsFileSystem
from deadline.job_attachments.exceptions import (
AssetSyncError,
AssetSyncCancelledError,
MisconfiguredInputsError,
)
from deadline.job_attachments.models import AssetUploadGroup, JobAttachmentsFileSystem
from deadline.job_attachments.progress_tracker import ProgressReportMetadata
from deadline.job_attachments._utils import _human_readable_file_size

Expand Down Expand Up @@ -87,6 +91,11 @@ def validate_parameters(ctx, param, value):
is_flag=True,
help="Skip any confirmation prompts",
)
@click.option(
"--require-paths-exist",
is_flag=True,
help="Require all input paths to exist",
)
@click.argument("job_bundle_dir")
@_handle_error
def bundle_submit(
Expand All @@ -98,6 +107,7 @@ def bundle_submit(
priority,
max_failed_tasks_count,
max_retries_per_task,
require_paths_exist,
**args,
):
"""
Expand All @@ -112,35 +122,54 @@ def bundle_submit(
def _check_create_job_wait_canceled() -> bool:
return sigint_handler.continue_operation

def _decide_cancel_submission(
deviated_file_count_by_root: dict[str, int],
num_files: int,
upload_size: int,
):
def _decide_cancel_submission(upload_group: AssetUploadGroup) -> bool:
"""
Callback to decide if submission should be cancelled or not. Return 'True' to cancel.
Prints a warning that requires confirmation if paths are found outside of configured storage profile locations.
"""
warning_message = ""
for group in upload_group.asset_groups:
if not group.file_system_location_name:
warning_message += f"\n\nUnder the directory '{group.root_path}':"
warning_message += (
f"\n\t{len(group.inputs)} input file{'' if len(group.inputs) == 1 else 's'}"
if len(group.inputs) > 0
else ""
)
warning_message += (
f"\n\t{len(group.outputs)} output director{'y' if len(group.outputs) == 1 else 'ies'}"
if len(group.outputs) > 0
else ""
)
warning_message += (
f"\n\t{len(group.references)} referenced file{'' if len(group.references) == 1 else 's'} and/or director{'y' if len(group.outputs) == 1 else 'ies'}"
if len(group.references) > 0
else ""
)

# Exit early if there are no warnings and we've either set auto accept or there's no files to confirm
if not warning_message and (
yes
or config_file.str2bool(get_setting("settings.auto_accept", config=config))
or upload_group.total_input_files == 0
):
return False

message_text = (
f"Job submission contains {num_files} files totaling {_human_readable_file_size(upload_size)}. "
" All files will be uploaded to S3 if they are not already present in the job attachments bucket."
f"Job submission contains {upload_group.total_input_files} input files totaling {_human_readable_file_size(upload_group.total_input_bytes)}. "
" All input files will be uploaded to S3 if they are not already present in the job attachments bucket."
)
if deviated_file_count_by_root:
root_by_count_message = "\n\n".join(
[
f"{file_count} files from : '{directory}'"
for directory, file_count in deviated_file_count_by_root.items()
]
)
if warning_message:
message_text += (
f"\n\nFiles were found outside of the configured storage profile location(s). "
" Please confirm that you intend to upload files from the following directories:\n\n"
f"{root_by_count_message}"
f"\n\nFiles were specified outside of the configured storage profile location(s). "
" Please confirm that you intend to submit a job that uses files from the following directories:"
f"{warning_message}\n\n"
"To permanently remove this warning you must only use files located within a storage profile location."
)
message_text += "\n\nDo you wish to proceed?"
return (
not (yes or config_file.str2bool(get_setting("settings.auto_accept", config=config)))
and num_files > 0
and not click.confirm(
message_text,
default=not deviated_file_count_by_root,
)
return not click.confirm(
message_text,
default=not warning_message,
)

try:
Expand All @@ -158,6 +187,7 @@ def _decide_cancel_submission(
create_job_result_callback=_check_create_job_wait_canceled,
print_function_callback=click.echo,
decide_cancel_submission_callback=_decide_cancel_submission,
require_paths_exist=require_paths_exist,
)

# Check Whether the CLI options are modifying any of the default settings that affect
Expand Down Expand Up @@ -191,6 +221,10 @@ def _decide_cancel_submission(
raise DeadlineOperationError(
f"Failed to submit the job bundle to AWS Deadline Cloud:\n{exc}"
) from exc
except MisconfiguredInputsError as exc:
click.echo(str(exc))
click.echo("Job submission canceled.")
return
except Exception as exc:
api.get_deadline_cloud_library_telemetry_client().record_error(
event_details={"exception_scope": "on_submit"},
Expand Down
2 changes: 1 addition & 1 deletion src/deadline/client/ui/dialogs/deadline_config_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def _build_farm_settings_ui(self, group, layout):

def _build_general_settings_ui(self, group, layout):
self.auto_accept = self._init_checkbox_setting(
group, layout, "settings.auto_accept", "Auto Accept Confirmation Prompts"
group, layout, "settings.auto_accept", "Auto Accept Prompt Defaults"
)
self.telemetry_opt_out = self._init_checkbox_setting(
group, layout, "telemetry.opt_out", "Telemetry Opt Out"
Expand Down
Loading

0 comments on commit f8d5826

Please sign in to comment.