Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(panos-upgrade-assurance): upgrade assurance modules introduction #437

Closed
wants to merge 14 commits into from
163 changes: 163 additions & 0 deletions plugins/modules/panos_active_in_ha.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-

# Copyright 2023 Palo Alto Networks, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import absolute_import, division, print_function

__metaclass__ = type

DOCUMENTATION = """
---
module: panos_active_in_ha
short_description: Checks if node is an active node in HA pair.
description:
- A wrapper around the PAN-OS Upgrade Assurance package.
- A simple boolean check, verifies if a node is an active (B(true)) or passive (B(false)) node in an HA pair.
- If node does not belong to an HA pair or the pair is no configured correctly the module will fail.
author: "Łukasz Pawlęga (@fosix)"
version_added: '2.16.0'
requirements:
- pan-python can be obtained from PyPI U(https://pypi.python.org/pypi/pan-python)
- pandevice can be obtained from PyPI U(https://pypi.python.org/pypi/pandevice)
- pan-os-upgrade-assurance can be obtained from PyPI U(https://pypi.org/project/panos-upgrade-assurance)
notes:
- Only Firewalls are supported.
- Check mode is not supported.
extends_documentation_fragment:
- paloaltonetworks.panos.fragments.transitional_provider
- paloaltonetworks.panos.fragments.vsys
options:
force_fail:
description:
- When set to B(true) will make the module fail also when node is passive.
This option is useful when we want to skip using M(ansible.builtin.assert).
type: bool
default: false
skip_config_sync:
description:
- When set to B(true) will skip configuration synchronization state between nodes before trying to retrieve
node's current state in an HA pair. Can be useful when working with partially upgraded nodes. Use with caution.
type: bool
default: false
# """

EXAMPLES = """
- name: Check if a node is active in HA pair
panos_active_in_ha:
provider: '{{ provider }}'
register: active_ha
- name: Run tasks dedicated to active node
ansible.builtin.include_tasks: active_dedicated.yml
when: active_ha.response.active
"""

RETURN = """
# Default return values
response:
description:
- Information on test results.
- This dict is available also when module is failed.
returned: always
type: dict
sample:
active: true
reason: '[SUCCESS]'
contains:
active:
description: Information if the device is active or not.
returned: always
type: bool
reason:
description: Meaningful if the device is not active.
returned: always
type: str
"""

from ansible.module_utils.basic import AnsibleModule
from ansible_collections.paloaltonetworks.panos.plugins.module_utils.panos import (
get_connection,
)

try:
from panos.panorama import Panorama
from panos_upgrade_assurance.check_firewall import CheckFirewall
from panos_upgrade_assurance.firewall_proxy import FirewallProxy
from panos_upgrade_assurance.utils import CheckStatus
except ImportError:
pass


def get_firewall_proxy_object(module_params: dict):
provider = module_params["provider"]
if provider["serial_number"]:
panorama = Panorama(
hostname=provider["ip_address"],
api_username=provider["username"],
api_password=provider["password"],
)
firewall = FirewallProxy(
serial=provider["serial_number"], vsys=module_params["vsys"]
)
panorama.add(firewall)
return firewall
else:
return FirewallProxy(
hostname=provider["ip_address"],
api_username=provider["username"],
api_password=provider["password"],
vsys=module_params["vsys"],
)


def main():
results = dict()

helper = get_connection(
vsys=True,
with_classic_provider_spec=True,
argument_spec=dict(
force_fail=dict(type="bool", default=False),
skip_config_sync=dict(type="bool", default=False),
),
)

module = AnsibleModule(
argument_spec=helper.argument_spec, supports_check_mode=False
)

firewall = get_firewall_proxy_object(module.params)

is_active = CheckFirewall(firewall).check_is_ha_active(
skip_config_sync=module.params["skip_config_sync"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have added another parameter ignore_non_functional to this method but we can leave it for a separate PR.

)

if module.params["force_fail"]:
response = str(is_active)
module_failed = not bool(is_active)
else:
response = {"active": bool(is_active), "reason": str(is_active)}
module_failed = (
True
if is_active.status in [CheckStatus.ERROR, CheckStatus.SKIPPED]
else False
)

module.exit_json(changed=False, response=response, failed=module_failed)


if __name__ == "__main__":
main()
215 changes: 215 additions & 0 deletions plugins/modules/panos_readiness_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-

# Copyright 2023 Palo Alto Networks, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import, division, print_function

__metaclass__ = type

DOCUMENTATION = """
---
module: panos_readiness_checks
short_description: Runs readiness checks (boolean in nature) against a Firewall device.
description:
- A wrapper around the PAN-OS Upgrade Assurance package.
- The module is meant to run readiness checks available in the package's L(CheckFirewall.run_readiness_checks()
method,https://pan.dev/panos/docs/panos-upgrade-assurance/api/check_firewall/#checkfirewallrun_readiness_checks).
Since it's just a wrapper, the way you would configure a check is exactly the same as if you would run the class directly.
Please refer to package's documentation for L(syntax,https://pan.dev/panos/docs/panos-upgrade-assurance/configuration-details/#readiness-checks)
and L(configuration dialect,https://pan.dev/panos/docs/panos-upgrade-assurance/dialect/).
author: "Łukasz Pawlęga (@fosix)"
version_added: '2.16.0'
requirements:
- pan-python can be obtained from PyPI U(https://pypi.python.org/pypi/pan-python)
- pandevice can be obtained from PyPI U(https://pypi.python.org/pypi/pandevice)
- pan-os-upgrade-assurance can be obtained from PyPI U(https://pypi.org/project/panos-upgrade-assurance)
notes:
- Only Firewalls are supported.
- Check mode is not supported.
extends_documentation_fragment:
- paloaltonetworks.panos.fragments.transitional_provider
- paloaltonetworks.panos.fragments.vsys
options:
checks:
description:
- A list of checks that should be run against a device. For the details on currently supported checks please refer to
L(package's documentation,https://pan.dev/panos/docs/panos-upgrade-assurance/configuration-details/#readiness-checks).
Comment on lines +48 to +49
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's reasons like this why having the upgrade assurance stuff as completely separate from this collection is problematic.

It's a bad user experience to read documentation to be referred to other documentation for potential values. And the waters get even muddier when a user's installed version of the upgrade assurance stuff is out of sync with what is live, and the values have changed.

These are not insurmountable issues, but you'll need to be ready for when it inevitably happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, but panos-upgrade-assurance is not for Ansible only, XSOAR can use it as well. Or just some pure Python script. Maybe more reasonable would be to keep it as part of the pan-os-python package?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. Separate it is, then! So you'll want to add a new parameter to ./plugins/module_utils/panos.py > get_connection() that allows you to do version checking against panos_upgrade_assurance.version the same as happens for both PAN-OS itself and pan-os-python / pandevice. Please have the logic of this new check look like this:

if self.min_panos_upgrade_assurance is not None:
    try:
        import panos_upgrade_assurance
    except ImportError:
        module.fail_json(msg='Missing required library "panos_upgrade_assurance".', syspath=sys.path)
    # This code assumes both panos_upgrade_assurance.version and self.min_panos_upgrade_assurance
    # are a tuple of 3 ints.  If panos_upgrade_assurance.version is a string, then you'll have
    # to turn it into a 3 element tuple of ints to do the comparison.
    if panos_upgrade_assurance.version < self.min_panos_upgrade_assurance:
        module.fail_json(msg=MIN_VERSION_ERROR.format(
            "panos_upgrade_assurance",
            _vstr(panos_upgrade_assurance.version),
            _vstr(self.min_panos_upgrade_assurance),
        ))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shinmog I am thinking we can use a super call in the FirewallProxy class to get the best of both worlds. Subclassing FirewallProxy makes life easy so what I'm thinking is something like (psuedocode here)

firewall = helper.get_pandevice_parent().device
firewall_proxy = FirewallProxy(firewall)

# ... in assurance library ...

Class FirewallProxy(Firewall):
    def __init__(self, firewall_object: Optional[Firewall] = None, *args, **kwargs):
        """
        """
        if firewall_object:
            super(FirewallProxy, self).__init__(
                api_key=firewall_object.xapi.api_key,
                api_password=firewall_object.xapi.api_password,
                api_username=firewall_object.xapi.api_username,
                serial=firewall_object.xapi.serial,
                *args,
                **kwargs
            )
        else:
            super(FirewallProxy, self).__init__()

Can you see any reason this wouldn't work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finally FirewallProxy has both options, so it can take the api-* parameters or an existing device object. Changes will be introduced in upgrade assurance with this pr.

This way we can leave the upgrade assurance as a standalone package and still use the helper class inside Ansible modules.

If someone has any objections, please leave a comment in the upgrade assurance PR.

- In most of the cases it is enough to specify a check name to run it with default settings.
In this case the list element is of type B(str). If additional configuration is required the element is a one element B(dict),
where key is the check name and value contains the check's configuration. For information which check requires additional configuration
please refer to L(package documentation,https://pan.dev/panos/docs/panos-upgrade-assurance/configuration-details/#readiness-checks).
type: list
elements: raw
default: ["all"]
force_fail:
description: When set to B(true) will make the module fail when at least one of the checks did not pass.
type: bool
default: false
skip_force_locale:
description:
- When set to B(true) will skip the B(en_US.UTF-8) locales on the checks.
- Use with caution only when you actually use different, English based locales but you do not have B(en_US.UTF-8) installed.
type: bool
default: false
"""

EXAMPLES = """
- name: Run all management plane checks using NOT notation
panos_readiness_checks:
provider: '{{ device }}'
checks:
- '!ha'
- '!session_exist'
- '!arp_entry_exist'
- '!ip_sec_tunnel_status'

- name: Check if a specified session exists in vsys2, fail if it does not
panos_readiness_checks:
provider: '{{ device }}'
vsys: vsys2
force_fail: true
checks:
- session_exist:
source: '34.23.15.1'
destination: '10.1.0.4'
dest_port: '80'
"""

RETURN = """
response:
description:
- This is a B(dict) where keys are checks names just as you specify them in the I(checks) property.
- Each value is also a B(dict).
- WHen I(force_fail) has the default value of B(false) this B(dict) contains results for all checks that were specified in I(checks) property.
- When I(force_fail) is set to B(true) it contains only checks that failed.
type: dict
returned: always
sample:
arp_entry_exist:
reason: "[SKIPPED] Missing ARP table entry description."
state: false
candidate_config:
reason: "[FAIL] Pending changes found on device."
state: false
content_version:
reason: "[FAIL] Installed content DB version (8640-7694) is not the latest one (8697-7981)."
state: false
free_disk_space:
reason: "[SUCCESS] "
state: true
ha:
reason: "[ERROR] Device is not a member of an HA pair."
state: false
ip_sec_tunnel_status:
reason: "[SKIPPED] Missing tunnel specification."
state: false
ntp_sync:
reason: "[ERROR] No NTP server configured."
state: false
panorama:
reason: "[SUCCESS] "
state: true
session_exist:
reason: "[SKIPPED] Missing critical session description. Failing check."
state: false
contains:
state:
description: A result of a check.
type: bool
returned: always
reason:
description:
- A free text describing the check result.
- 'Prefixed with a keyword: SUCCESS, FAIL, ERROR, SKIPPED.'
- Meaningful only for failed tests as the ones succeeded are self explanatory.
type: str
returned: always
"""

from ansible.module_utils.basic import AnsibleModule
from ansible_collections.paloaltonetworks.panos.plugins.module_utils.panos import (
get_connection,
)

try:
from panos_upgrade_assurance.check_firewall import CheckFirewall
from panos_upgrade_assurance.firewall_proxy import FirewallProxy
from panos.panorama import Panorama
except ImportError:
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we have the PUA_AVAILABLE logic here as in the panos_snapshot_report.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here



def get_firewall_proxy_object(module_params: dict):
provider = module_params["provider"]
if provider["serial_number"]:
panorama = Panorama(
hostname=provider["ip_address"],
api_username=provider["username"],
api_password=provider["password"],
)
firewall = FirewallProxy(
serial=provider["serial_number"], vsys=module_params["vsys"]
)
panorama.add(firewall)
return firewall
else:
return FirewallProxy(
hostname=provider["ip_address"],
api_username=provider["username"],
api_password=provider["password"],
vsys=module_params["vsys"],
)


def main():
results = dict()

helper = get_connection(
vsys=True,
with_classic_provider_spec=True,
argument_spec=dict(
checks=dict(type="list", default=["all"], elements="raw"),
force_fail=dict(type="bool", default=False),
skip_force_locale=dict(type="bool", default=False),
),
)

module = AnsibleModule(
argument_spec=helper.argument_spec,
supports_check_mode=False,
)
results = dict()
module_failed = False

firewall = get_firewall_proxy_object(module.params)

checks = CheckFirewall(
node=firewall, skip_force_locale=module.params["skip_force_locale"]
)
results = checks.run_readiness_checks(checks_configuration=module.params["checks"])

if module.params["force_fail"]:
for check in list(results.keys()):
if results[check]["state"]:
del results[check]
else:
module_failed = True

module.exit_json(changed=False, response=results, failed=module_failed)


if __name__ == "__main__":
main()
Loading