diff --git a/src/next/HISTORY.rst b/src/next/HISTORY.rst index c4263908e34..6311dc7d602 100644 --- a/src/next/HISTORY.rst +++ b/src/next/HISTORY.rst @@ -3,6 +3,11 @@ Release History =============== +0.1.3 +++++++ +* Support recommending similar E2E scenarios based on the recent multiple execution commands +* Add new parameters `--scenario/-s` and `--command/-c` to support specifying recommendation type (command recommendation or scenario recommendation) + 0.1.2 ++++++ * Fix the bug that the nested command history could not be logged diff --git a/src/next/azext_next/__init__.py b/src/next/azext_next/__init__.py index f9988b2837b..7e6dbc906c8 100644 --- a/src/next/azext_next/__init__.py +++ b/src/next/azext_next/__init__.py @@ -3,10 +3,11 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- +import threading + from azure.cli.core import AzCommandsLoader from azext_next._help import helps # pylint: disable=unused-import -import threading class NextCommandsLoader(AzCommandsLoader): @@ -14,7 +15,7 @@ class NextCommandsLoader(AzCommandsLoader): _instance_lock = threading.Lock() _has_reload_command_table = False - def __new__(cls, cli_ctx=None): + def __new__(cls, cli_ctx=None): # pylint: disable=unused-argument if not hasattr(NextCommandsLoader, "_instance"): with NextCommandsLoader._instance_lock: if not hasattr(NextCommandsLoader, "_instance"): @@ -25,8 +26,7 @@ def __init__(self, cli_ctx=None): from azure.cli.core.commands import CliCommandType next_custom = CliCommandType( operations_tmpl='azext_next.custom#{}') - super(NextCommandsLoader, self).__init__(cli_ctx=cli_ctx, - custom_command_type=next_custom) + super().__init__(cli_ctx=cli_ctx, custom_command_type=next_custom) # Because the help content of other modules needs to be loaded when executing "az next" # So modify the environment variable AZURE_CORE_USE_COMMAND_INDEX=False, and then reload the command table diff --git a/src/next/azext_next/_help.py b/src/next/azext_next/_help.py index b1e79df5e7e..c03c8abf78b 100644 --- a/src/next/azext_next/_help.py +++ b/src/next/azext_next/_help.py @@ -16,19 +16,22 @@ [1] az config set next.execute_in_prompt=True/False Turn on/off the step of executing recommended commands in interactive mode. Turn on by default. - [2] az config set next.filter_type=True/False - Turn on/off the step of filtering recommendation type. Turn off by default. + [2] az config set next.recommended_type=all/scenario/command + Set the default recommended type. All is the default. [3] az config set next.output=json/jsonc/none/table/tsv/yaml/yamlc/status Set default output format. Status is the default. - [4] az config set next.num_limit={amount_limit} - Set the limit of recommended items. 5 is the default. + [4] az config set next.command_num_limit={command_amount_limit} + Set the limit of recommended command items. 5 is the default. - [5] az config set next.show_arguments=True/False + [5] az config set next.scenario_num_limit={scenario_amount_limit} + Set the limit of recommended scenario items. 5 is the default. + + [6] az config set next.show_arguments=True/False Show/hide the arguments of recommended items. False is the default. - [6] az config set next.print_help=True/False + [7] az config set next.print_help=True/False Enable/disable whether to print help actively before executing each command. False is the default. """ diff --git a/src/next/azext_next/_params.py b/src/next/azext_next/_params.py index 94866f0a484..da32c7dceaa 100644 --- a/src/next/azext_next/_params.py +++ b/src/next/azext_next/_params.py @@ -8,4 +8,6 @@ def load_arguments(self, _): # pylint: disable=unused-argument - pass + with self.argument_context('next') as c: + c.argument('scenario_only', options_list=['--scenario', '-s'], action='store_true', help='Specify this parameter will only recommend E2E scenarios') + c.argument('command_only', options_list=['--command', '-c'], action='store_true', help='Specify this parameter will only recommend commands') diff --git a/src/next/azext_next/constants.py b/src/next/azext_next/constants.py index 4eb5e70a6be..647d158743a 100644 --- a/src/next/azext_next/constants.py +++ b/src/next/azext_next/constants.py @@ -11,3 +11,13 @@ class RecommendType(int, Enum): Solution = 2 Command = 3 Scenario = 4 + + @staticmethod + def get(name): + if name.lower() == "solution": + return RecommendType.Solution + if name.lower() == "command": + return RecommendType.Command + if name.lower() == "scenario": + return RecommendType.Scenario + return RecommendType.All diff --git a/src/next/azext_next/custom.py b/src/next/azext_next/custom.py index 5ee55640c44..53582d62483 100644 --- a/src/next/azext_next/custom.py +++ b/src/next/azext_next/custom.py @@ -2,41 +2,44 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -import re import json +import re +from azure.cli.core import telemetry from azure.cli.core.azclierror import RecommendationError +from azure.cli.core.style import Style, print_styled_text from knack import help_files -from .utils import (get_int_option, get_command_list, get_last_exception, get_title_case, get_yes_or_no_option, - get_latest_command) + from .constants import RecommendType -from colorama import Fore, init from .requests import get_recommend_from_api -from azure.cli.core.style import print_styled_text, Style -import azure.cli.core.telemetry as telemetry - +from .utils import (OptionRange, select_combined_option, get_command_list, + get_last_exception, get_latest_command, + capitalize_first_char, get_yes_or_no_option, select_option) -def handle_next(cmd): - init(autoreset=True) # turn on automatic color recovery for colorama - if cmd.cli_ctx.config.getboolean('next', 'filter_type', fallback=False): - request_type = _get_filter_option() +def handle_next(cmd, command_only=False, scenario_only=False): + if scenario_only: + request_type = RecommendType.Scenario.value + elif command_only: + request_type = RecommendType.Command.value else: - request_type = RecommendType.All.value + # Fallback to the configured filter_type if not command_only and scenario_only + request_type = RecommendType.get(cmd.cli_ctx.config.get('next', 'recommended_type', fallback='all')).value # Upload all execution commands of local record for personalized analysis command_history = get_command_list(cmd, 0) processed_exception = None - if request_type == RecommendType.All or request_type == RecommendType.Solution: + if request_type in (RecommendType.All, RecommendType.Solution): processed_exception = get_last_exception(cmd, get_latest_command(command_history)) if request_type == RecommendType.Solution and not processed_exception: _handle_error_no_exception_found() - return None + return recommends = get_recommend_from_api(command_history, request_type, - cmd.cli_ctx.config.getint('next', 'num_limit', fallback=5), + cmd.cli_ctx.config.getint('next', 'command_num_limit', fallback=5), + cmd.cli_ctx.config.getint('next', 'scenario_num_limit', fallback=5), error_info=processed_exception) if not recommends: send_feedback(request_type, -1, command_history, processed_exception) @@ -44,18 +47,54 @@ def handle_next(cmd): return print() - _give_recommends(cmd, recommends) - option = get_int_option("Please select your option " + Fore.LIGHTBLACK_EX + "(if none, enter 0)" + - Fore.RESET + ": ", 0, len(recommends), -1) - if option == 0: - send_feedback(request_type, 0, command_history, processed_exception, recommends) - print('\nThank you for your feedback. If you have more feedback, please submit it by using "az feedback" \n') - return - print() + # divide recommendation items into two sets + command_recommendations = [item for item in recommends if item['type'] != RecommendType.Scenario] + scenario_recommendations = [item for item in recommends if item['type'] == RecommendType.Scenario] + has_multi_type_recommendation = command_recommendations and scenario_recommendations + + if not has_multi_type_recommendation: + _give_recommends(cmd, recommends) + + option_msg = [(Style.PRIMARY, "Please select your option "), + (Style.SECONDARY, "(if none, enter 0)"), (Style.PRIMARY, ": ")] + option = select_option(option_msg, min_option=0, max_option=len(recommends), default_option=-1) + if option == 0: + send_feedback(request_type, 0, command_history, processed_exception, recommends) + print( + '\nThank you for your feedback. If you have more feedback, please submit it by using "az feedback" \n') + return + print() - rec = recommends[option - 1] - send_feedback(request_type, option, command_history, processed_exception, recommends, rec) + rec = recommends[option - 1] + send_feedback(request_type, option, command_history, processed_exception, recommends, rec) + else: + # display scenario recommendations with prefix 'b', and command recommendations with prefix 'a' + print_styled_text([(Style.PRIMARY, "COMMAND")]) + print() + _give_recommends(cmd, command_recommendations, prefix='a') + print_styled_text([(Style.PRIMARY, "SCENARIO")]) + print() + _give_recommends(cmd, scenario_recommendations, prefix='b') + option_msg = [(Style.PRIMARY, "Please select your option "), + (Style.SECONDARY, "(for example, enter \"b2\" for the second scenario. if none, enter 0)"), + (Style.PRIMARY, ": ")] + group, option = select_combined_option( + option_msg, + {'a': OptionRange(1, len(command_recommendations)), 'b': OptionRange(1, len(scenario_recommendations))}, + (None, -1)) + if group == 'a': + rec = command_recommendations[option - 1] + send_feedback(request_type, group + str(option), command_history, processed_exception, recommends, rec) + elif group == 'b': + rec = scenario_recommendations[option - 1] + send_feedback(request_type, group + str(option), command_history, processed_exception, recommends, rec) + else: + send_feedback(request_type, 0, command_history, processed_exception, recommends) + print( + '\nThank you for your feedback. If you have more feedback, please submit it by using "az feedback" \n') + return + print() if rec['type'] == RecommendType.Scenario: _show_details_for_e2e_scenario(cmd, rec) @@ -69,7 +108,7 @@ def handle_next(cmd): 'If you want to execute the commands in interactive mode, ' 'you can use "az config set next.execute_in_prompt=True" to set it up.\n') - return None + return def _handle_error_no_exception_found(): @@ -83,19 +122,17 @@ def _handle_error_no_exception_found(): az_error.print_error() -def _give_recommends(cmd, recommends): - idx = 0 - for rec in recommends: - idx += 1 +def _give_recommends(cmd, recommends, prefix=''): + for idx, rec in enumerate(recommends): if rec['type'] == RecommendType.Scenario: - _give_recommend_scenarios(idx, rec) + _give_recommend_scenarios(prefix + str(idx + 1), rec) else: - _give_recommend_commands(cmd, idx, rec) + _give_recommend_commands(cmd, prefix + str(idx + 1), rec) def _get_cmd_help_from_ctx(cmd, default_value): command_help = default_value - cmd_help = help_files._load_help_file(cmd) + cmd_help = help_files._load_help_file(cmd) # pylint: disable=protected-access if cmd_help and 'short-summary' in cmd_help: command_help = cmd_help['short-summary'] if command_help: @@ -104,7 +141,7 @@ def _get_cmd_help_from_ctx(cmd, default_value): def _feed_arguments_from_sample(rec): - cmd_help = help_files._load_help_file(rec['command']) + cmd_help = help_files._load_help_file(rec['command']) # pylint: disable=protected-access if cmd_help: if cmd_help['type'] == 'group': rec['arguments'] = ['-h'] @@ -141,7 +178,7 @@ def _give_recommend_commands(cmd, idx, rec): _feed_arguments_from_sample(rec) if 'arguments' in rec and cmd.cli_ctx.config.getboolean('next', 'show_arguments', fallback=False): - command_item = "{} {}".format(command_item, ' '.join(rec['arguments'])) + command_item = f"{command_item} {' '.join(rec['arguments'])}" print_styled_text([(Style.ACTION, index_str), (Style.PRIMARY, command_item)]) if 'reason' in rec: @@ -150,26 +187,29 @@ def _give_recommend_commands(cmd, idx, rec): reason = _get_cmd_help_from_ctx(rec['command'], "") if 'usage_condition' in rec and rec['usage_condition']: reason = reason + " (" + rec['usage_condition'] + ")" + reason = reason.rstrip(".") if reason: space_padding = re.sub('.', ' ', index_str) - print_styled_text([(Style.SECONDARY, space_padding + get_title_case(reason) + "\n")]) + print_styled_text([(Style.SECONDARY, space_padding + capitalize_first_char(reason) + "\n")]) else: print() def _give_recommend_scenarios(idx, rec): index_str = "[" + str(idx) + "] " - num_notice = " ({} Commands)".format(len(rec['nextCommandSet'])) + num_notice = f" ({len(rec['nextCommandSet'])} Commands)" print_styled_text([(Style.ACTION, index_str), (Style.PRIMARY, rec['scenario']), (Style.SECONDARY, num_notice)]) if 'reason' in rec: - reason = rec['reason'] + # Use the first sentence as reason to keep the reason short + reason = rec['reason'].split('.')[0] else: reason = "This is a set of commands that may help you complete this scenario." + reason = reason.rstrip(".") if reason: space_padding = re.sub('.', ' ', index_str) - print_styled_text([(Style.SECONDARY, space_padding + get_title_case(reason) + "\n")]) + print_styled_text([(Style.SECONDARY, space_padding + capitalize_first_char(reason) + "\n")]) else: print() @@ -190,7 +230,7 @@ def _execute_nx_cmd(cmd, nx_cmd, nx_param, catch_exception=False): if param in store_true_params: args.append(param) else: - print("Please input " + Fore.LIGHTBLUE_EX + param + Fore.RESET + ":", end='') + print_styled_text([(Style.ACTION, "Please input "), (Style.PRIMARY, param + ": ")], end='') value = input() if param == '': if value: @@ -206,7 +246,7 @@ def _execute_nx_cmd(cmd, nx_cmd, nx_param, catch_exception=False): if '--output' not in args and '-o' not in args: args.append('--output') - if 'status' == output_format: + if output_format == 'status': is_show_operation = False for operation in ['show', 'list', 'get', 'version']: if operation in nx_cmd: @@ -226,69 +266,88 @@ def _execute_nx_cmd(cmd, nx_cmd, nx_param, catch_exception=False): else: try: exit_code = cmd.cli_ctx.invoke(args) - except Exception: + except Exception: # pylint: disable=broad-except return -1 except SystemExit: return -1 - if 'status' == output_format and exit_code == 0: + if output_format == 'status' and exit_code == 0: from .utils import print_successful_styled_text + print() print_successful_styled_text('command completed\n') return exit_code -def _get_command_item_sample(rec): - if "example" in rec and rec["example"]: - example = Fore.LIGHTBLUE_EX + rec["example"] - example = re.sub('<', Fore.RESET + '<', example) - example = re.sub('>', '>' + Fore.LIGHTBLUE_EX, example) - return example +def _get_command_sample(command): + """Try getting example from command. Or load the example from `--help` if not found.""" + if "example" in command and command["example"]: + command_sample, _ = _format_command_sample(command["example"].replace(" $", " ")) + return command_sample - nx_param = [] - if "arguments" in rec and rec["arguments"]: - nx_param = rec["arguments"] - sorted_nx_param = sorted(nx_param) - cmd_help = help_files._load_help_file(rec['command']) + from knack import help_files + parameter = [] + if "arguments" in command and command["arguments"]: + parameter = command["arguments"] + sorted_param = sorted(parameter) + cmd_help = help_files._load_help_file(command['command']) # pylint: disable=protected-access if cmd_help and 'examples' in cmd_help and cmd_help['examples']: for cmd_example in cmd_help['examples']: - cmd_items = cmd_example['text'].split() - - arguments_start = False - example_arguments = [] - command_item = [] - argument_values = {} + command_sample, example_arguments = _format_command_sample(cmd_example['text']) + if sorted(example_arguments) == sorted_param: + return command_sample + + command = command["command"] if command["command"].startswith("az ") else "az " + command["command"] + command_sample = f"{command} {' '.join(parameter) if parameter else ''}" + return [(Style.PRIMARY, command_sample)] + + +def _format_command_sample(command_sample): + """ + Format command sample in the style of `az xxx --name `. + Also return the arguments used in the sample. + """ + if not command_sample: + return [], [] + + cmd_items = command_sample.split() + arguments_start = False + example_arguments = [] + command_item = [] + argument_values = {} + values = [] + for item in cmd_items: + if item.startswith('-'): + arguments_start = True + if values and example_arguments: + argument_values[example_arguments[-1]] = values values = [] - for item in cmd_items: - if item.startswith('-'): - arguments_start = True - if values and example_arguments: - argument_values[example_arguments[-1]] = values - values = [] - example_arguments.append(item) - elif not arguments_start: - command_item.append(item) - else: - values.append(item) - if values and example_arguments: - argument_values[example_arguments[-1]] = values + example_arguments.append(item) + elif not arguments_start: + command_item.append(item) + else: + values.append(item) + if values and example_arguments: + argument_values[example_arguments[-1]] = values - if sorted(example_arguments) == sorted_nx_param: - example = Fore.LIGHTBLUE_EX + ' '.join(command_item) - for argument in example_arguments: - example = example + " " + Fore.LIGHTBLUE_EX + argument - if argument in argument_values and argument_values[argument]: - example = example + Fore.RESET + ' <' + ' '.join(argument_values[argument]) + '>' - return example + formatted_example = [(Style.PRIMARY, ' '.join(command_item))] + for argument in example_arguments: + formatted_example.append((Style.PRIMARY, " " + argument)) + if argument in argument_values and argument_values[argument]: + argument_value = ' '.join(argument_values[argument]) + if not (argument_value.startswith('<') and argument_value.endswith('>')): + argument_value = '<' + argument_value + '>' + formatted_example.append((Style.WARNING, ' ' + argument_value)) - return Fore.LIGHTBLUE_EX + "az {} {}".format(rec["command"], ' '.join(nx_param) if nx_param else "") + return formatted_example, example_arguments def _execute_recommend_commands(cmd, rec): nx_param = [] if "arguments" in rec: nx_param = rec["arguments"] - print("\nRunning: " + _get_command_item_sample(rec)) + print_styled_text([(Style.ACTION, "Running: ")], end='') + print_styled_text(_get_command_sample(rec)) print_styled_text([(Style.SECONDARY, "Input Enter to skip unnecessary parameters")]) execute_result = _execute_nx_cmd(cmd, rec["command"], nx_param, catch_exception=True) is_help_printed = False @@ -297,7 +356,8 @@ def _execute_recommend_commands(cmd, rec): _print_help_info(cmd, rec["command"]) is_help_printed = True - step_msg = "Do you want to retry this command? " + Fore.LIGHTBLACK_EX + "(y/n)" + Fore.RESET + ": " + step_msg = [(Style.PRIMARY, "Do you want to retry this command? "), (Style.SECONDARY, "(y/n)"), + (Style.PRIMARY, ": ")] run_option = get_yes_or_no_option(step_msg) if run_option: execute_result = _execute_nx_cmd(cmd, rec["command"], nx_param, catch_exception=True) @@ -307,7 +367,9 @@ def _execute_recommend_commands(cmd, rec): def _execute_recommend_scenarios(cmd, rec): - for nx_cmd in rec["nextCommandSet"]: + exec_idx = rec.get("executeIndex") + for idx in exec_idx: + nx_cmd = rec["nextCommandSet"][idx] nx_param = [] if "arguments" in nx_cmd: nx_param = nx_cmd["arguments"] @@ -315,10 +377,11 @@ def _execute_recommend_scenarios(cmd, rec): if cmd.cli_ctx.config.getboolean('next', 'print_help', fallback=False): _print_help_info(cmd, nx_cmd["command"]) - print("\nRunning: " + _get_command_item_sample(nx_cmd)) - step_msg = "How do you want to run this step? 1. Run it 2. Skip it 3. Quit process " + Fore.LIGHTBLACK_EX \ - + "(Enter is to Run)" + Fore.RESET + ": " - run_option = get_int_option(step_msg, 1, 3, 1) + print_styled_text([(Style.ACTION, "Running: ")], end='') + print_styled_text(_get_command_sample(nx_cmd)) + option_msg = [(Style.PRIMARY, "How do you want to run this step? 1. Run it 2. Skip it 3. Quit process "), + (Style.SECONDARY, "(Enter is to Run)"), (Style.PRIMARY, ": ")] + run_option = select_option(option_msg, min_option=1, max_option=3, default_option=1) if run_option == 1: print_styled_text([(Style.SECONDARY, "Input Enter to skip unnecessary parameters")]) execute_result = _execute_nx_cmd(cmd, nx_cmd['command'], nx_param, catch_exception=True) @@ -328,9 +391,10 @@ def _execute_recommend_scenarios(cmd, rec): _print_help_info(cmd, nx_cmd["command"]) is_help_printed = True - step_msg = "Do you want to retry this step? 1. Run it 2. Skip it 3. Quit process " + Fore.LIGHTBLACK_EX \ - + "(Enter is to Run)" + Fore.RESET + ": " - run_option = get_int_option(step_msg, 1, 3, 1) + option_msg = [ + (Style.PRIMARY, "How do you want to run this step? 1. Run it 2. Skip it 3. Quit process "), + (Style.SECONDARY, "(Enter is to Run)"), (Style.PRIMARY, ": ")] + run_option = select_option(option_msg, min_option=1, max_option=3, default_option=1) if run_option == 1: execute_result = _execute_nx_cmd(cmd, nx_cmd['command'], nx_param, catch_exception=True) elif run_option == 2: @@ -350,34 +414,24 @@ def _execute_recommend_scenarios(cmd, rec): print_successful_styled_text('All commands in this scenario have been executed! \n') -def _get_filter_option(): - msg = ''' -Please select the type of recommendation you need: -1. all: It will intelligently analyze the types of recommendation you need, and may recommend multiple types of command to you -2. solution: Only the solutions to problems when errors occur are recommend -3. command: Only the commands with high correlation with previously executed commands are recommend -4. scenario: Only the E2E scenarios related to current usage scenarios are recommended -''' - print(msg) - return get_int_option("What kind of recommendation do you want? " + Fore.LIGHTBLACK_EX + "(RETURN is to set all)" + - Fore.RESET + ": ", 1, 4, 1) - - def _show_details_for_e2e_scenario(cmd, rec): - print_styled_text([(Style.PRIMARY, rec['scenario']), - (Style.ACTION, " contains the following commands:\n")]) + print_styled_text([(Style.WARNING, rec['scenario']), + (Style.PRIMARY, " contains the following commands:\n")]) nx_cmd_set = rec["nextCommandSet"] - idx = 0 - for nx_cmd in nx_cmd_set: - idx += 1 + exec_idx = rec.get("executeIndex") + for idx, nx_cmd in enumerate(nx_cmd_set): command_item = "az " + nx_cmd['command'] if 'arguments' in nx_cmd and cmd.cli_ctx.config.getboolean('next', 'show_arguments', fallback=False): - command_item = "{} {}".format(command_item, ' '.join(nx_cmd['arguments'])) - print(command_item) + command_item = f"{command_item} {' '.join(nx_cmd['arguments'])}" + cmd_active = idx in exec_idx + styled_command = [(Style.ACTION, " > "), (Style.PRIMARY, command_item)] + if not cmd_active: + styled_command.append((Style.WARNING, " (executed)")) + print_styled_text(styled_command) if nx_cmd['reason']: - print_styled_text([(Style.SECONDARY, get_title_case(nx_cmd['reason']) + "\n")]) + print_styled_text([(Style.SECONDARY, capitalize_first_char(nx_cmd['reason']) + "\n")]) else: print() diff --git a/src/next/azext_next/requests.py b/src/next/azext_next/requests.py index e2b936c8a7e..8af2f513f4f 100644 --- a/src/next/azext_next/requests.py +++ b/src/next/azext_next/requests.py @@ -10,7 +10,7 @@ # pylint: disable=protected-access -def get_recommend_from_api(command_list, type, top_num=5, error_info=None): # pylint: disable=unused-argument +def get_recommend_from_api(command_list, recommend_type, command_top_num=5, scenario_top_num=5, error_info=None): # pylint: disable=unused-argument '''query next command from web api''' import requests url = "https://cli-recommendation.azurewebsites.net/api/RecommendationService" @@ -19,8 +19,9 @@ def get_recommend_from_api(command_list, type, top_num=5, error_info=None): # p hashed_user_id = hashlib.sha256(user_id.encode('utf-8')).hexdigest() payload = { "command_list": json.dumps(command_list), - "type": type, - "top_num": top_num, + "type": recommend_type, + "command_top_num": command_top_num, + "scenario_top_num": scenario_top_num, 'error_info': error_info, 'cli_version': version, 'user_id': hashed_user_id @@ -37,8 +38,7 @@ def get_recommend_from_api(command_list, type, top_num=5, error_info=None): # p response = requests.post(url, json.dumps(payload)) if response.status_code != 200: raise RecommendationError( - "Failed to connect to '{}' with status code '{}' and reason '{}'".format( - url, response.status_code, response.reason)) + f"Failed to connect to '{url}' with status code '{response.status_code}' and reason '{response.reason}'") recommends = [] if 'data' in response.json(): diff --git a/src/next/azext_next/utils.py b/src/next/azext_next/utils.py index d9b84252dd5..ed9386f062e 100644 --- a/src/next/azext_next/utils.py +++ b/src/next/azext_next/utils.py @@ -2,24 +2,50 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -from colorama import Fore -import os import json +import os + +from azure.cli.core.style import print_styled_text, Style -def read_int(default_value=0): +def input_int(default_value=0): + """Read an int from `stdin`. Retry if input is not a number""" ret = input() if ret == '' or ret is None: return default_value while not ret.isnumeric(): - ret = input("Please input a legal number: ") + ret = input("Please input a valid number: ") if ret == '' or ret is None: return default_value return int(ret) -def get_yes_or_no_option(option_description): - print(Fore.LIGHTBLUE_EX + ' ? ' + Fore.RESET + option_description, end='') +def input_combined_option(group_range, default_value=(None, 0)): + """ + Read combined option from stdin, ensure the group_name is in range or be None and option is a valid number + :param group_range: the range a valid group name should be in + :param default_value: + :return: the combined option read from stdin + :rtype: tuple[str|None, int] + """ + ret = input() + if ret == '' or ret is None: + return default_value + if ret.isnumeric(): + return None, int(ret) + while True: + group = ret[:1] + option = ret[1:] + if group not in group_range: + ret = input("The option should start with " + " or ".join(group_range) + ":") + elif not option.isnumeric(): + ret = input("The option should end with a valid number: ") + else: + return group, int(option) + + +def get_yes_or_no_option(option_msg): + print_styled_text([(Style.ACTION, " ? ")] + option_msg, end='') option = input() yes_options = ["y", "yes", "Y", "Yes", "YES"] no_options = ["n", "no", "N", "No", "NO"] @@ -28,12 +54,54 @@ def get_yes_or_no_option(option_description): return option in yes_options -def get_int_option(option_description, min_option, max_option, default_option): - print(Fore.LIGHTBLUE_EX + ' ? ' + Fore.RESET + option_description, end='') - option = read_int(default_option) +class OptionRange: # pylint: disable=too-few-public-methods + def __init__(self, min_option, max_option): + self.min_option = min_option + self.max_option = max_option + + def __contains__(self, item): + return self.min_option <= item <= self.max_option + + +def select_combined_option(option_msg, valid_groups, default_option): + """ + Read a combined option from stdin. + Request user to re-enter if not in valid_group or range + + :param option_msg: styled description to display + :param valid_groups: a dict for valid group name and option range + :type valid_groups: dict[str, OptionRange] + :param default_option: default option if users input empty str + :type default_option: tuple[str|None, int] + :return: the group name and option the user input, which is ensured in valid groups or be (None, 0) + :rtype: tuple[str|None, int] + """ + print_styled_text([(Style.ACTION, " ? ")] + option_msg, end='') + group, option = input_combined_option(valid_groups.keys(), default_option) + while True: + if group is None: + if option == 0: + return group, option + print("The option should start with \"a\" for scenario, \"b\" for commands or be \"0\": ", end='') + group, option = input_combined_option(valid_groups.keys(), default_option) + elif option not in valid_groups[group]: + print(f"The option should end with a valid option " + f"({group}{valid_groups[group].min_option}-{group}{valid_groups[group].max_option}): ", end='') + group, option = input_combined_option(valid_groups.keys(), default_option) + else: + return group, option + + +def select_option(option_msg, min_option, max_option, default_option): + """Read an option from `stdin` ranging from `min_option` to `max_option`. + Retry if input is out of range. + """ + print_styled_text([(Style.ACTION, " ? ")] + option_msg, end='') + option = input_int(default_option) while option < min_option or option > max_option: - print("Please enter a valid option ({}-{}): ".format(min_option, max_option), end='') - option = read_int(default_option) + print_styled_text([(Style.PRIMARY, f"Please enter a valid option ({min_option}-{max_option}): ")], + end='') + option = input_int(default_option) return option @@ -42,7 +110,7 @@ def get_command_list(cmd, num=2): history_file_name = os.path.join(cmd.cli_ctx.config.config_dir, 'recommendation', 'cmd_history.log') if not os.path.exists(history_file_name): return _get_command_list_from_core(cmd, num) - with open(history_file_name, "r") as f: + with open(history_file_name, "r", encoding="utf-8") as f: lines = f.read().splitlines() lines = [x for x in lines if x != 'next'] return lines[-num:] @@ -77,7 +145,7 @@ def _parse_command_file(command_file_path): if not os.path.exists(command_file_path): return "" - with open(command_file_path, "r") as f: + with open(command_file_path, "r", encoding="utf-8") as f: first_line = f.readline() if not first_line: return "" @@ -126,12 +194,11 @@ def get_last_exception(cmd, latest_command): if not cmd.cli_ctx.config.getboolean('core', 'collect_telemetry', fallback=True): return '' - import os telemetry_cache_file = os.path.join(cmd.cli_ctx.config.config_dir, 'telemetry', 'cache') if not os.path.exists(telemetry_cache_file): return '' - with open(telemetry_cache_file, "r") as f: + with open(telemetry_cache_file, "r", encoding="utf-8") as f: lines = f.read().splitlines() history_data_list = reversed(lines) for history_data_item in history_data_list: @@ -145,11 +212,11 @@ def get_last_exception(cmd, latest_command): import ast data_dict = ast.literal_eval(record_data[1]) if not data_dict or len(data_dict) != 1: - return + return '' - data_item = [item for item in data_dict.values()][0][0] + data_item = list(data_dict.values())[0][0] if not data_item or 'properties' not in data_item: - return + return '' properties = data_item['properties'] command_key = 'Context.Default.AzureCLI.RawCommand' @@ -177,26 +244,26 @@ def get_last_exception(cmd, latest_command): return '' -def get_title_case(str): - if not str: - return str - str = str.strip() - return str[0].upper() + str[1:] +def capitalize_first_char(string): + if not string: + return string + string = string.strip() + return string[0].upper() + string[1:] def print_successful_styled_text(message): - from azure.cli.core.style import print_styled_text, Style, is_modern_terminal + from azure.cli.core.style import (Style, is_modern_terminal, + print_styled_text) - prefix_text = '\nDone: ' + prefix_text = 'Done: ' if is_modern_terminal(): - prefix_text = '\n(✓ )Done: ' + prefix_text = '(✓)Done: ' print_styled_text([(Style.SUCCESS, prefix_text), (Style.PRIMARY, message)]) def log_command_history(command, args): - import os - from knack.util import ensure_dir from azure.cli.core._environment import get_config_dir + from knack.util import ensure_dir if not args or '--no-log' in args: return @@ -209,15 +276,15 @@ def log_command_history(command, args): file_path = os.path.join(base_dir, 'cmd_history.log') if not os.path.exists(file_path): - with open(file_path, 'w') as fd: + with open(file_path, 'w', encoding='utf-8') as fd: fd.write('') lines = [] - with open(file_path, 'r') as fd: + with open(file_path, 'r', encoding='utf-8') as fd: lines = fd.readlines() lines = [x.strip('\n') for x in lines if x] - with open(file_path, 'w') as fd: + with open(file_path, 'w', encoding='utf-8') as fd: command_info = {'command': command} params = [] for arg in args: diff --git a/src/next/setup.py b/src/next/setup.py index 8d9d1b24769..31cf88d55d4 100644 --- a/src/next/setup.py +++ b/src/next/setup.py @@ -16,7 +16,7 @@ # TODO: Confirm this is the right version number you want and it matches your # HISTORY.rst entry. -VERSION = '0.1.2' +VERSION = '0.1.3' # The full list of classifiers is available at # https://pypi.python.org/pypi?%3Aaction=list_classifiers