diff --git a/mreg_cli/exceptions.py b/mreg_cli/exceptions.py index 5f28f095..ba5b3534 100644 --- a/mreg_cli/exceptions.py +++ b/mreg_cli/exceptions.py @@ -58,3 +58,20 @@ class NetworkNotFoundWarning(CliWarning): """Warning class for network not found.""" pass + + +class LoginFailedError(CliException): + """Error class for login failure.""" + + def formatted_exception(self) -> str: + """Return a string formatted with 'Login failed:' prefixing the error message. + + :returns: Formatted error message. + """ + return f"Login failed: {super().__str__()}" + + def __str__(self) -> str: + """Return the error message.""" + return "Login failed" + + pass diff --git a/mreg_cli/main.py b/mreg_cli/main.py index 033a97e0..e32b65f6 100644 --- a/mreg_cli/main.py +++ b/mreg_cli/main.py @@ -11,8 +11,9 @@ import mreg_cli.utilities.api as api from mreg_cli.cli import cli, source from mreg_cli.config import MregCliConfig +from mreg_cli.exceptions import LoginFailedError from mreg_cli.outputmanager import OutputManager -from mreg_cli.utilities.api import login1 +from mreg_cli.utilities.api import try_token_or_login from . import log @@ -134,9 +135,9 @@ def main(): return try: - login1(config.get("user"), config.get("url")) - except (EOFError, KeyboardInterrupt): - print("") + try_token_or_login(config.get("user"), config.get("url")) + except (EOFError, KeyboardInterrupt, LoginFailedError) as e: + print(e) raise SystemExit() from None if args.show_token: print(api.session.headers["Authorization"]) diff --git a/mreg_cli/utilities/api.py b/mreg_cli/utilities/api.py index 1fdb06fa..3719fe81 100644 --- a/mreg_cli/utilities/api.py +++ b/mreg_cli/utilities/api.py @@ -23,7 +23,7 @@ from prompt_toolkit import prompt from mreg_cli.config import MregCliConfig -from mreg_cli.exceptions import CliError +from mreg_cli.exceptions import CliError, LoginFailedError from mreg_cli.log import cli_error, cli_warning session = requests.Session() @@ -52,8 +52,18 @@ def set_file_permissions(f: str, mode: int) -> None: pass -def login1(user: str, url: str) -> None: - """Login to MREG.""" +def try_token_or_login(user: str, url: str) -> None: + """Check for a valid token or interactively log in to MREG. + + Exits on connection failure. + + :param user: Username to login with. + :param url: URL to MREG. + + :raises LoginFailedError: If login fails. + + :returns: Nothing. + """ if os.path.isfile(mreg_auth_token_file): try: with open(mreg_auth_token_file, encoding="utf-8") as tokenfile: @@ -80,18 +90,29 @@ def login1(user: str, url: str) -> None: error(f"Could not connect to {url}") if ret.status_code == 401: - login(user, url) + prompt_for_password_and_login(user, url, catch_exception=False) + +def prompt_for_password_and_login(user: str, url: str, catch_exception: bool = True) -> None: + """Login to MREG. -def login(user: str, url: str) -> None: - """Login to MREG.""" + :param user: Username to login with. + :param url: URL to MREG. + :param catch_exception: If True, login errors are caught, otherwise they are passed on. + + :raises LoginFailedError: If login fails and catch_exception is False. + + :returns: Nothing. + """ print(f"Connecting to {url}") - # get url password = prompt(f"Password for {user}: ", is_password=True) try: - _update_token(user, password) + auth_and_update_token(user, password) except CliError as e: - e.print_self() + if catch_exception: + e.print_self() + else: + raise LoginFailedError() from e def logout() -> None: @@ -104,16 +125,16 @@ def logout() -> None: pass -def update_token() -> None: - """Update the token.""" +def prompt_for_password_and_try_update_token() -> None: + """Prompt for a password and try to update the token.""" password = prompt("You need to re-autenticate\nEnter password: ", is_password=True) try: - _update_token(MregCliConfig().get("user"), password) + auth_and_update_token(MregCliConfig().get("user"), password) except CliError as e: e.print_self() -def _update_token(username: Optional[str], password: str) -> None: +def auth_and_update_token(username: Optional[str], password: str) -> None: """Perform the actual token update.""" tokenurl = urljoin(MregCliConfig().get_url(), "/api/token-auth/") try: @@ -182,8 +203,8 @@ def _request_wrapper( OutputManager().recording_request(operation_type, url, params, data, result) if first and result.status_code == 401: - update_token() - return _request_wrapper(operation_type, path, first=False, **data) + prompt_for_password_and_try_update_token() + return _request_wrapper(operation_type, path, params=params, first=False, **data) elif result.status_code == 404 and ok404: return None