From b982f55a62b024be0cc2e14f73d48ee2bcad42a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthieu=20T=C3=A2che?= Date: Fri, 10 Nov 2023 22:39:01 +0100 Subject: [PATCH] fix: improve error handling and ansible group parsing of anta get from-ansible (#449) --------- Co-authored-by: gmuloc --- anta/cli/get/commands.py | 21 +++-- anta/cli/get/utils.py | 67 ++++++++------ tests/data/ansible_inventory.yml | 47 ++++++++++ tests/data/empty_ansible_inventory.yml | 1 + tests/units/cli/get/test_commands.py | 62 ++++++++++++- tests/units/cli/get/test_utils.py | 119 +++++++++++++++++++++++++ 6 files changed, 280 insertions(+), 37 deletions(-) create mode 100644 tests/data/ansible_inventory.yml create mode 100644 tests/data/empty_ansible_inventory.yml create mode 100644 tests/units/cli/get/test_utils.py diff --git a/anta/cli/get/commands.py b/anta/cli/get/commands.py index 5dddfbaaf..8c9cb4814 100644 --- a/anta/cli/get/commands.py +++ b/anta/cli/get/commands.py @@ -22,7 +22,7 @@ from rich.pretty import pretty_repr from anta.cli.console import console -from anta.cli.utils import parse_tags +from anta.cli.utils import ExitCode, parse_tags from .utils import create_inventory_from_ansible, create_inventory_from_cvp, get_cv_token @@ -74,7 +74,8 @@ def from_cvp(inventory_directory: str, cvp_ip: str, cvp_username: str, cvp_passw @click.command(no_args_is_help=True) -@click.option("--ansible-group", "-g", help="Ansible group to filter", type=str, required=False) +@click.pass_context +@click.option("--ansible-group", "-g", help="Ansible group to filter", type=str, required=False, default="all") @click.option( "--ansible-inventory", "-i", @@ -89,17 +90,21 @@ def from_cvp(inventory_directory: str, cvp_ip: str, cvp_username: str, cvp_passw help="Path to save inventory file", type=click.Path(file_okay=True, dir_okay=False, exists=False, writable=True, path_type=Path), ) -def from_ansible(output: Path, ansible_inventory: Path, ansible_group: str) -> None: +def from_ansible(ctx: click.Context, output: Path, ansible_inventory: Path, ansible_group: str) -> None: """Build ANTA inventory from an ansible inventory YAML file""" logger.info(f"Building inventory from ansible file {ansible_inventory}") # Create output directory output.parent.mkdir(parents=True, exist_ok=True) - create_inventory_from_ansible( - inventory=ansible_inventory, - output_file=output, - ansible_root=ansible_group, - ) + try: + create_inventory_from_ansible( + inventory=ansible_inventory, + output_file=output, + ansible_group=ansible_group, + ) + except ValueError as e: + logger.error(str(e)) + ctx.exit(ExitCode.USAGE_ERROR) @click.command() diff --git a/anta/cli/get/utils.py b/anta/cli/get/utils.py index 655a04f72..d1900b057 100644 --- a/anta/cli/get/utils.py +++ b/anta/cli/get/utils.py @@ -1,7 +1,6 @@ # Copyright (c) 2023 Arista Networks, Inc. # Use of this source code is governed by the Apache License 2.0 # that can be found in the LICENSE file. - """ Utils functions to use with anta.cli.get.commands module. """ @@ -10,13 +9,14 @@ import json import logging from pathlib import Path -from typing import Any, Union +from typing import Any import requests import urllib3 import yaml -from ...inventory import AntaInventory +from anta.inventory import AntaInventory +from anta.inventory.models import AntaInventoryHost, AntaInventoryInput urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -25,6 +25,7 @@ def get_cv_token(cvp_ip: str, cvp_username: str, cvp_password: str) -> str: """Generate AUTH token from CVP using password""" + # TODO, need to handle requests eror # use CVP REST API to generate a token URL = f"https://{cvp_ip}/cvpservice/login/authenticate.do" @@ -35,7 +36,7 @@ def get_cv_token(cvp_ip: str, cvp_username: str, cvp_password: str) -> str: return response.json()["sessionId"] -def create_inventory_from_cvp(inv: list[dict[str, Any]], directory: str, container: str) -> None: +def create_inventory_from_cvp(inv: list[dict[str, Any]], directory: str, container: str | None = None) -> None: """ create an inventory file from Arista CloudVision """ @@ -52,44 +53,56 @@ def create_inventory_from_cvp(inv: list[dict[str, Any]], directory: str, contain logger.info(f"Inventory file has been created in {out_file}") -def create_inventory_from_ansible(inventory: Path, output_file: Path, ansible_root: Union[str, None] = None) -> None: +def create_inventory_from_ansible(inventory: Path, output_file: Path, ansible_group: str = "all") -> None: """ Create an ANTA inventory from an Ansible inventory YAML file Args: - inventory (str): Ansible Inventory file to read - output_file (str, optional): ANTA inventory file to generate. - ansible_root (Union[str, None], optional): Ansible group from where to extract data. Defaults to None. + inventory: Ansible Inventory file to read + output_file: ANTA inventory file to generate. + ansible_root: Ansible group from where to extract data. """ - def deep_yaml_parsing(data: dict[str, Any], hosts: Union[None, list[dict[str, str]]] = None) -> Union[None, list[dict[str, str]]]: + def find_ansible_group(data: dict[str, Any], group: str) -> dict[str, Any] | None: + for k, v in data.items(): + if isinstance(v, dict): + if k == group and ("children" in v.keys() or "hosts" in v.keys()): + return v + d = find_ansible_group(v, group) + if d is not None: + return d + return None + + def deep_yaml_parsing(data: dict[str, Any], hosts: list[AntaInventoryHost] | None = None) -> list[AntaInventoryHost]: """Deep parsing of YAML file to extract hosts and associated IPs""" if hosts is None: hosts = [] for key, value in data.items(): if isinstance(value, dict) and "ansible_host" in value.keys(): - hosts.append({"name": key, "host": value["ansible_host"]}) + logger.info(f" * adding entry for {key}") + hosts.append(AntaInventoryHost(name=key, host=value["ansible_host"])) elif isinstance(value, dict): deep_yaml_parsing(value, hosts) else: return hosts return hosts - i: dict[str, dict[str, Any]] = {AntaInventory.INVENTORY_ROOT_KEY: {"hosts": []}} - with open(inventory, encoding="utf-8") as inv: - ansible_inventory = yaml.safe_load(inv) - if ansible_root not in ansible_inventory.keys(): - logger.error(f"Group {ansible_root} not in ansible inventory {inventory}") - raise ValueError(f"Group {ansible_root} not in ansible inventory {inventory}") - if ansible_root is None: - ansible_hosts = deep_yaml_parsing(ansible_inventory, hosts=[]) - else: - ansible_hosts = deep_yaml_parsing(ansible_inventory[ansible_root], hosts=[]) - if ansible_hosts is None: - ansible_hosts = [] - for dev in ansible_hosts: - logger.info(f' * adding entry for {dev["name"]}') - i[AntaInventory.INVENTORY_ROOT_KEY]["hosts"].append({"host": dev["host"], "name": dev["name"]}) + try: + with open(inventory, encoding="utf-8") as inv: + ansible_inventory = yaml.safe_load(inv) + except OSError as exc: + raise ValueError(f"Could not parse {inventory}.") from exc + + if not ansible_inventory: + raise ValueError(f"Ansible inventory {inventory} is empty") + + ansible_inventory = find_ansible_group(ansible_inventory, ansible_group) + + if ansible_inventory is None: + raise ValueError(f"Group {ansible_group} not found in Ansible inventory") + ansible_hosts = deep_yaml_parsing(ansible_inventory) + i = AntaInventoryInput(hosts=ansible_hosts) + # TODO, catch issue with open(output_file, "w", encoding="UTF-8") as out_fd: - out_fd.write(yaml.dump(i)) - logger.info(f"Inventory file has been created in {output_file}") + out_fd.write(yaml.dump({AntaInventory.INVENTORY_ROOT_KEY: i.model_dump(exclude_unset=True)})) + logger.info(f"ANTA device inventory file has been created in {output_file}") diff --git a/tests/data/ansible_inventory.yml b/tests/data/ansible_inventory.yml new file mode 100644 index 000000000..a95850518 --- /dev/null +++ b/tests/data/ansible_inventory.yml @@ -0,0 +1,47 @@ +--- +all: + children: + cv_servers: + hosts: + cv_atd1: + ansible_host: 10.73.1.238 + ansible_user: tom + ansible_password: arista123 + cv_collection: v3 + ATD_LAB: + vars: + ansible_user: arista + ansible_ssh_pass: arista + children: + ATD_FABRIC: + children: + ATD_SPINES: + vars: + type: spine + hosts: + spine1: + ansible_host: 192.168.0.10 + spine2: + ansible_host: 192.168.0.11 + ATD_LEAFS: + vars: + type: l3leaf + children: + pod1: + hosts: + leaf1: + ansible_host: 192.168.0.12 + leaf2: + ansible_host: 192.168.0.13 + pod2: + hosts: + leaf3: + ansible_host: 192.168.0.14 + leaf4: + ansible_host: 192.168.0.15 + ATD_TENANTS_NETWORKS: + children: + ATD_LEAFS: + ATD_SERVERS: + children: + ATD_LEAFS: diff --git a/tests/data/empty_ansible_inventory.yml b/tests/data/empty_ansible_inventory.yml new file mode 100644 index 000000000..ed97d539c --- /dev/null +++ b/tests/data/empty_ansible_inventory.yml @@ -0,0 +1 @@ +--- diff --git a/tests/units/cli/get/test_commands.py b/tests/units/cli/get/test_commands.py index fc877d889..81761342d 100644 --- a/tests/units/cli/get/test_commands.py +++ b/tests/units/cli/get/test_commands.py @@ -4,9 +4,9 @@ """ Tests for anta.cli.get.commands """ - from __future__ import annotations +import os from pathlib import Path from typing import TYPE_CHECKING, cast from unittest.mock import ANY, patch @@ -16,13 +16,15 @@ from cvprac.cvp_client_errors import CvpApiError from anta.cli import anta -from anta.cli.get.commands import from_cvp +from anta.cli.get.commands import from_ansible, from_cvp from tests.lib.utils import default_anta_env if TYPE_CHECKING: from click.testing import CliRunner from pytest import CaptureFixture, LogCaptureFixture +DATA_DIR: Path = Path(__file__).parents[3].resolve() / "data" + # Not testing for required parameter, click does this well. @pytest.mark.parametrize( @@ -95,3 +97,59 @@ def mock_cvp_connect(self: CvpClient, *args: str, **kwargs: str) -> None: else: assert "Error connecting to cvp" in caplog.text assert result.exit_code == 1 + + +@pytest.mark.parametrize( + "ansible_inventory, ansible_group, output, expected_exit", + [ + pytest.param("ansible_inventory.yml", None, None, 0, id="no group"), + pytest.param("ansible_inventory.yml", "ATD_LEAFS", None, 0, id="group found"), + pytest.param("ansible_inventory.yml", "DUMMY", None, 4, id="group not found"), + pytest.param("empty_ansible_inventory.yml", None, None, 4, id="empty inventory"), + ], +) +# pylint: disable-next=too-many-arguments +def test_from_ansible( + tmp_path: Path, + caplog: LogCaptureFixture, + capsys: CaptureFixture[str], + click_runner: CliRunner, + ansible_inventory: Path, + ansible_group: str | None, + output: Path | None, + expected_exit: int, +) -> None: + """ + Test `anta get from-ansible` + """ + env = default_anta_env() + cli_args = ["get", "from-ansible"] + + os.chdir(tmp_path) + if output is not None: + cli_args.extend(["--output", str(output)]) + out_dir = Path() / output + else: + # Get inventory-directory default + default_dir: Path = cast(Path, from_ansible.params[2].default) + out_dir = Path() / default_dir + + if ansible_inventory is not None: + ansible_inventory_path = DATA_DIR / ansible_inventory + cli_args.extend(["--ansible-inventory", str(ansible_inventory_path)]) + + if ansible_group is not None: + cli_args.extend(["--ansible-group", ansible_group]) + + with capsys.disabled(): + print(cli_args) + result = click_runner.invoke(anta, cli_args, env=env, auto_envvar_prefix="ANTA") + + print(result) + + assert result.exit_code == expected_exit + print(caplog.records) + if expected_exit != 0: + assert len(caplog.records) == 2 + else: + assert out_dir.exists() diff --git a/tests/units/cli/get/test_utils.py b/tests/units/cli/get/test_utils.py new file mode 100644 index 000000000..36f49354d --- /dev/null +++ b/tests/units/cli/get/test_utils.py @@ -0,0 +1,119 @@ +# Copyright (c) 2023 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +""" +Tests for anta.cli.get.utils +""" +from __future__ import annotations + +from contextlib import nullcontext +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest +import requests + +from anta.cli.get.utils import create_inventory_from_ansible, create_inventory_from_cvp, get_cv_token +from anta.inventory import AntaInventory + +DATA_DIR: Path = Path(__file__).parents[3].resolve() / "data" + + +def test_get_cv_token() -> None: + """ + Test anta.get.utils.get_cv_token + """ + ip = "42.42.42.42" + username = "ant" + password = "formica" + + with patch("anta.cli.get.utils.requests.request") as patched_request: + mocked_ret = MagicMock(autospec=requests.Response) + mocked_ret.json.return_value = {"sessionId": "simple"} + patched_request.return_value = mocked_ret + res = get_cv_token(ip, username, password) + patched_request.assert_called_once_with( + "POST", + "https://42.42.42.42/cvpservice/login/authenticate.do", + headers={"Content-Type": "application/json", "Accept": "application/json"}, + data='{"userId": "ant", "password": "formica"}', + verify=False, + timeout=10, + ) + assert res == "simple" + + +# truncated inventories +CVP_INVENTORY = [ + { + "hostname": "device1", + "containerName": "DC1", + "ipAddress": "10.20.20.97", + }, + { + "hostname": "device2", + "containerName": "DC2", + "ipAddress": "10.20.20.98", + }, + { + "hostname": "device3", + "containerName": "", + "ipAddress": "10.20.20.99", + }, +] + + +@pytest.mark.parametrize( + "cvp_container, inventory", + [ + pytest.param(None, CVP_INVENTORY, id="no container"), + pytest.param("DC1", CVP_INVENTORY, id="some container"), + pytest.param(None, [], id="empty_inventory"), + ], +) +def test_create_inventory_from_cvp(tmp_path: Path, cvp_container: str | None, inventory: list[dict[str, Any]]) -> None: + """ + Test anta.get.utils.create_inventory_from_cvp + """ + target_dir = tmp_path / "inventory" + target_dir.mkdir() + + create_inventory_from_cvp(inventory, str(target_dir), cvp_container) + + expected_inventory_file_name = "inventory.yml" if cvp_container is None else f"inventory-{cvp_container}.yml" + expected_inventory_path = target_dir / expected_inventory_file_name + assert expected_inventory_path.exists() + # This validate the file structure ;) + inv = AntaInventory().parse(str(expected_inventory_path), "user", "pass") + assert len(inv) == len(inventory) + + +@pytest.mark.parametrize( + "inventory_filename, ansible_group, expected_raise, expected_inv_length", + [ + pytest.param("ansible_inventory.yml", None, nullcontext(), 7, id="no group"), + pytest.param("ansible_inventory.yml", "ATD_LEAFS", nullcontext(), 4, id="group found"), + pytest.param("ansible_inventory.yml", "DUMMY", pytest.raises(ValueError, match="Group DUMMY not found in Ansible inventory"), 0, id="group not found"), + pytest.param("empty_ansible_inventory.yml", None, pytest.raises(ValueError, match="Ansible inventory .* is empty"), 0, id="empty inventory"), + pytest.param("wrong_ansible_inventory.yml", None, pytest.raises(ValueError, match="Could not parse"), 0, id="os error inventory"), + ], +) +def test_create_inventory_from_ansible(tmp_path: Path, inventory_filename: Path, ansible_group: str | None, expected_raise: Any, expected_inv_length: int) -> None: + """ + Test anta.get.utils.create_inventory_from_ansible + """ + target_file = tmp_path / "inventory.yml" + inventory_file_path = DATA_DIR / inventory_filename + + with expected_raise: + if ansible_group: + create_inventory_from_ansible(inventory_file_path, target_file, ansible_group) + else: + create_inventory_from_ansible(inventory_file_path, target_file) + + assert target_file.exists() + inv = AntaInventory().parse(str(target_file), "user", "pass") + assert len(inv) == expected_inv_length + if not isinstance(expected_raise, nullcontext): + assert not target_file.exists()