From add1f5b46d3b777a60ee74c6b90e6c458016b43b Mon Sep 17 00:00:00 2001 From: Mark Chappell Date: Wed, 2 Nov 2022 12:51:14 +0100 Subject: [PATCH] break up get_target_from_rule (#1221) ec2_security_group - refacter get_target_from_rule() SUMMARY refacter get_target_from_rule to bring down the complexity score Builds on top of #1214 ISSUE TYPE Feature Pull Request COMPONENT NAME ec2_security_group ADDITIONAL INFORMATION Reviewed-by: Alina Buzachis --- .../fragments/1221-ec2_security_group.yml | 2 + plugins/modules/ec2_security_group.py | 277 +++++++++++------- .../test_get_target_from_rule.py | 101 +++++++ .../ec2_security_group/test_validate_ip.py | 66 +++++ .../modules/test_ec2_security_group.py | 15 - 5 files changed, 332 insertions(+), 129 deletions(-) create mode 100644 changelogs/fragments/1221-ec2_security_group.yml create mode 100644 tests/unit/plugins/modules/ec2_security_group/test_get_target_from_rule.py create mode 100644 tests/unit/plugins/modules/ec2_security_group/test_validate_ip.py diff --git a/changelogs/fragments/1221-ec2_security_group.yml b/changelogs/fragments/1221-ec2_security_group.yml new file mode 100644 index 00000000000..be97d077e2a --- /dev/null +++ b/changelogs/fragments/1221-ec2_security_group.yml @@ -0,0 +1,2 @@ +minor_changes: +- ec2_security_group - refacter ``get_target_from_rule()`` (https://github.com/ansible-collections/amazon.aws/pull/1221). diff --git a/plugins/modules/ec2_security_group.py b/plugins/modules/ec2_security_group.py index 6e9e694ad66..6dc87e27eb2 100644 --- a/plugins/modules/ec2_security_group.py +++ b/plugins/modules/ec2_security_group.py @@ -485,7 +485,6 @@ import re from collections import namedtuple from copy import deepcopy -from ipaddress import IPv6Network from ipaddress import ip_network from time import sleep @@ -694,6 +693,116 @@ def validate_rule(rule): raise SecurityGroupError(msg='Specify proto: icmp or icmpv6 when using icmp_type/icmp_code') +def _target_from_rule_with_group_id(rule, groups): + owner_id = current_account_id + FOREIGN_SECURITY_GROUP_REGEX = r'^([^/]+)/?(sg-\S+)?/(\S+)' + foreign_rule = re.match(FOREIGN_SECURITY_GROUP_REGEX, rule['group_id']) + + if not foreign_rule: + return 'group', (owner_id, rule['group_id'], None), False + + # this is a foreign Security Group. Since you can't fetch it you must create an instance of it + # Matches on groups like amazon-elb/sg-5a9c116a/amazon-elb-sg, amazon-elb/amazon-elb-sg, + # and peer-VPC groups like 0987654321/sg-1234567890/example + owner_id, group_id, group_name = foreign_rule.groups() + group_instance = dict(UserId=owner_id, GroupId=group_id, GroupName=group_name) + groups[group_id] = group_instance + groups[group_name] = group_instance + if group_id and group_name: + if group_name.startswith('amazon-'): + # amazon-elb and amazon-prefix rules don't need group_id specified, + group_id = None + else: + # For cross-VPC references we'll use group_id as it is more specific + group_name = None + return 'group', (owner_id, group_id, group_name), False + + +def _lookup_target_or_fail(client, group_name, vpc_id, groups, msg): + owner_id = current_account_id + filters = {'group-name': group_name} + if vpc_id: + filters['vpc-id'] = vpc_id + + filters = ansible_dict_to_boto3_filter_list(filters) + try: + found_group = get_security_groups_with_backoff(client, Filters=filters).get('SecurityGroups', [])[0] + except (is_boto3_error_code('InvalidGroup.NotFound'), IndexError): + raise SecurityGroupError(msg=msg) + except (BotoCoreError, ClientError) as e: # pylint: disable=duplicate-except + raise SecurityGroupError(msg="Failed to get security group", e=e) + + group_id = found_group['GroupId'] + groups[group_id] = found_group + groups[group_name] = found_group + return 'group', (owner_id, group_id, None), False + + +def _create_target_from_rule(client, rule, groups, vpc_id, check_mode): + owner_id = current_account_id + # We can't create a group in check mode... + if check_mode: + return 'group', (owner_id, None, None), True + + group_name = rule['group_name'] + + try: + created_group = _create_security_group_with_wait(client, group_name, rule['group_desc'], vpc_id) + except is_boto3_error_code('InvalidGroup.Duplicate'): + # The group exists, but didn't show up in any of our previous describe-security-groups calls + # Try searching on a filter for the name, and allow a retry window for AWS to update + # the model on their end. + fail_msg = "Could not create or use existing group '{0}' in rule {1}. " \ + "Make sure the group exists and try using the group_id " \ + "instead of the name".format(group_name, rule) + return _lookup_target_or_fail(client, group_name, vpc_id, groups, fail_msg) + except (BotoCoreError, ClientError) as e: + raise SecurityGroupError(msg="Failed to create security group '{0}' in rule {1}", e=e) + + group_id = created_group['GroupId'] + groups[group_id] = created_group + groups[group_name] = created_group + + return 'group', (owner_id, group_id, None), True + + +def _target_from_rule_with_group_name(client, rule, name, group, groups, vpc_id, check_mode): + group_name = rule['group_name'] + owner_id = current_account_id + if group_name == name: + # Simplest case, the rule references itself + group_id = group['GroupId'] + groups[group_id] = group + groups[group_name] = group + return 'group', (owner_id, group_id, None), False + + # Already cached groups + if group_name in groups and group.get('VpcId') and groups[group_name].get('VpcId'): + # both are VPC groups, this is ok + group_id = groups[group_name]['GroupId'] + return 'group', (owner_id, group_id, None), False + + if group_name in groups and not (group.get('VpcId') or groups[group_name].get('VpcId')): + # both are EC2 classic, this is ok + group_id = groups[group_name]['GroupId'] + return 'group', (owner_id, group_id, None), False + + # if we got here, either the target group does not exist, or there + # is a mix of EC2 classic + VPC groups. Mixing of EC2 classic + VPC + # is bad, so we have to create a new SG because no compatible group + # exists + + # Without a group description we can't create a new group, try looking up the group, or fail + # with a descriptive error message + if not rule.get('group_desc', '').strip(): + # retry describing the group + fail_msg = "group '{0}' not found and would be automatically created by rule {1} but " \ + "no description was provided".format(group_name, rule) + return _lookup_target_or_fail(client, group_name, vpc_id, groups, fail_msg) + + return _create_target_from_rule(client, rule, groups, vpc_id, check_mode) + + def get_target_from_rule(module, client, rule, name, group, groups, vpc_id): """ Returns tuple of (target_type, target, group_created) after validating rule params. @@ -711,101 +820,21 @@ def get_target_from_rule(module, client, rule, name, group, groups, vpc_id): values that will be compared to current_rules (from current_ingress and current_egress) in wait_for_rule_propagation(). """ - FOREIGN_SECURITY_GROUP_REGEX = r'^([^/]+)/?(sg-\S+)?/(\S+)' - owner_id = current_account_id - group_id = None - group_name = None - target_group_created = False - try: validate_rule(rule) + if rule.get('group_id'): + return _target_from_rule_with_group_id(rule, groups) + elif 'group_name' in rule: + return _target_from_rule_with_group_name(client, rule, name, group, groups, vpc_id, module.check_mode) + elif 'cidr_ip' in rule: + return 'ipv4', validate_ip(module, rule['cidr_ip']), False + elif 'cidr_ipv6' in rule: + return 'ipv6', validate_ip(module, rule['cidr_ipv6']), False + elif 'ip_prefix' in rule: + return 'ip_prefix', rule['ip_prefix'], False except SecurityGroupError as e: e.fail(module) - if rule.get('group_id') and re.match(FOREIGN_SECURITY_GROUP_REGEX, rule['group_id']): - # this is a foreign Security Group. Since you can't fetch it you must create an instance of it - # Matches on groups like amazon-elb/sg-5a9c116a/amazon-elb-sg, amazon-elb/amazon-elb-sg, - # and peer-VPC groups like 0987654321/sg-1234567890/example - owner_id, group_id, group_name = re.match(FOREIGN_SECURITY_GROUP_REGEX, rule['group_id']).groups() - group_instance = dict(UserId=owner_id, GroupId=group_id, GroupName=group_name) - groups[group_id] = group_instance - groups[group_name] = group_instance - if group_id and group_name: - if group_name.startswith('amazon-'): - # amazon-elb and amazon-prefix rules don't need group_id specified, - group_id = None - else: - # group_id/group_name are mutually exclusive - give group_id more precedence as it is more specific - group_name = None - return 'group', (owner_id, group_id, group_name), False - elif 'group_id' in rule: - return 'group', (owner_id, rule['group_id'], None), False - elif 'group_name' in rule: - group_name = rule['group_name'] - if group_name == name: - group_id = group['GroupId'] - groups[group_id] = group - groups[group_name] = group - elif group_name in groups and group.get('VpcId') and groups[group_name].get('VpcId'): - # both are VPC groups, this is ok - group_id = groups[group_name]['GroupId'] - elif group_name in groups and not (group.get('VpcId') or groups[group_name].get('VpcId')): - # both are EC2 classic, this is ok - group_id = groups[group_name]['GroupId'] - else: - auto_group = None - filters = {'group-name': group_name} - if vpc_id: - filters['vpc-id'] = vpc_id - # if we got here, either the target group does not exist, or there - # is a mix of EC2 classic + VPC groups. Mixing of EC2 classic + VPC - # is bad, so we have to create a new SG because no compatible group - # exists - if not rule.get('group_desc', '').strip(): - # retry describing the group once - try: - auto_group = get_security_groups_with_backoff(client, Filters=ansible_dict_to_boto3_filter_list(filters)).get('SecurityGroups', [])[0] - except (is_boto3_error_code('InvalidGroup.NotFound'), IndexError): - module.fail_json(msg="group %s will be automatically created by rule %s but " - "no description was provided" % (group_name, rule)) - except ClientError as e: # pylint: disable=duplicate-except - module.fail_json_aws(e) - elif not module.check_mode: - params = dict(GroupName=group_name, Description=rule['group_desc']) - if vpc_id: - params['VpcId'] = vpc_id - try: - auto_group = client.create_security_group(aws_retry=True, **params) - get_waiter( - client, 'security_group_exists', - ).wait( - GroupIds=[auto_group['GroupId']], - ) - except is_boto3_error_code('InvalidGroup.Duplicate'): - # The group exists, but didn't show up in any of our describe-security-groups calls - # Try searching on a filter for the name, and allow a retry window for AWS to update - # the model on their end. - try: - auto_group = get_security_groups_with_backoff(client, Filters=ansible_dict_to_boto3_filter_list(filters)).get('SecurityGroups', [])[0] - except IndexError: - module.fail_json(msg="Could not create or use existing group '{0}' in rule. Make sure the group exists".format(group_name)) - except ClientError as e: - module.fail_json_aws( - e, - msg="Could not create or use existing group '{0}' in rule. Make sure the group exists".format(group_name)) - if auto_group is not None: - group_id = auto_group['GroupId'] - groups[group_id] = auto_group - groups[group_name] = auto_group - target_group_created = True - return 'group', (owner_id, group_id, None), target_group_created - elif 'cidr_ip' in rule: - return 'ipv4', validate_ip(module, rule['cidr_ip']), False - elif 'cidr_ipv6' in rule: - return 'ipv6', validate_ip(module, rule['cidr_ipv6']), False - elif 'ip_prefix' in rule: - return 'ip_prefix', rule['ip_prefix'], False - module.fail_json(msg="Could not match target for rule", failed_rule=rule) @@ -977,30 +1006,36 @@ def authorize(client, module, ip_permissions, group_id, rule_type): def validate_ip(module, cidr_ip): split_addr = cidr_ip.split('/') - if len(split_addr) == 2: - # this_ip is a IPv4 or IPv6 CIDR that may or may not have host bits set - # Get the network bits if IPv4, and validate if IPv6. - try: - ip = to_subnet(split_addr[0], split_addr[1]) - if ip != cidr_ip: - module.warn("One of your CIDR addresses ({0}) has host bits set. To get rid of this warning, " - "check the network mask and make sure that only network bits are set: {1}.".format( - cidr_ip, ip)) - except ValueError: - # to_subnet throws a ValueError on IPv6 networks, so we should be working with v6 if we get here - try: - isinstance(ip_network(to_text(cidr_ip)), IPv6Network) - ip = cidr_ip - except ValueError: - # If a host bit is set on something other than a /128, IPv6Network will throw a ValueError - # The ipv6_cidr in this case probably looks like "2001:DB8:A0B:12F0::1/64" and we just want the network bits - ip6 = to_ipv6_subnet(split_addr[0]) + "/" + split_addr[1] - if ip6 != cidr_ip: - module.warn("One of your IPv6 CIDR addresses ({0}) has host bits set. To get rid of this warning, " - "check the network mask and make sure that only network bits are set: {1}.".format(cidr_ip, ip6)) - return ip6 + if len(split_addr) != 2: + return cidr_ip + + try: + ip = ip_network(to_text(cidr_ip)) + return str(ip) + except ValueError: + # If a host bit is incorrectly set, ip_network will throw an error at us, + # we'll continue, convert the address to a CIDR AWS will accept, but issue a warning. + pass + + # Try evaluating as an IPv4 network, it'll throw a ValueError if it can't parse cidr_ip as an + # IPv4 network + try: + ip = to_subnet(split_addr[0], split_addr[1]) + module.warn("One of your CIDR addresses ({0}) has host bits set. To get rid of this warning, " + "check the network mask and make sure that only network bits are set: {1}.".format(cidr_ip, ip)) return ip - return cidr_ip + except ValueError: + pass + + # Try again, evaluating as an IPv6 network. + try: + ip6 = to_ipv6_subnet(split_addr[0]) + "/" + split_addr[1] + module.warn("One of your IPv6 CIDR addresses ({0}) has host bits set. To get rid of this warning, " + "check the network mask and make sure that only network bits are set: {1}.".format(cidr_ip, ip6)) + return ip6 + except ValueError: + module.warn("Unable to parse CIDR ({0}).".format(cidr_ip)) + return cidr_ip def update_tags(client, module, group_id, current_tags, tags, purge_tags): @@ -1048,6 +1083,20 @@ def update_rule_descriptions(module, client, group_id, present_ingress, named_tu return changed +def _create_security_group_with_wait(client, name, description, vpc_id): + params = dict(GroupName=name, Description=description) + if vpc_id: + params['VpcId'] = vpc_id + + created_group = client.create_security_group(aws_retry=True, **params) + get_waiter( + client, 'security_group_exists', + ).wait( + GroupIds=[created_group['GroupId']], + ) + return created_group + + def create_security_group(client, module, name, description, vpc_id): if not module.check_mode: params = dict(GroupName=name, Description=description) diff --git a/tests/unit/plugins/modules/ec2_security_group/test_get_target_from_rule.py b/tests/unit/plugins/modules/ec2_security_group/test_get_target_from_rule.py new file mode 100644 index 00000000000..950467fff0d --- /dev/null +++ b/tests/unit/plugins/modules/ec2_security_group/test_get_target_from_rule.py @@ -0,0 +1,101 @@ +# (c) 2022 Red Hat Inc. +# +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from copy import deepcopy +import pytest +from unittest.mock import sentinel + +import ansible_collections.amazon.aws.plugins.modules.ec2_security_group as ec2_security_group_module + + +@pytest.fixture +def ec2_security_group(monkeypatch): + # monkey patches various ec2_security_group module functions, we'll separately test the operation of + # these functions, we just care that it's passing the results into the right place in the + # instance spec. + monkeypatch.setattr(ec2_security_group_module, 'current_account_id', sentinel.CURRENT_ACCOUNT_ID) + return ec2_security_group_module + + +def test_target_from_rule_with_group_id_local_group(ec2_security_group): + groups = dict() + original_groups = deepcopy(groups) + rule_type, target, created = ec2_security_group._target_from_rule_with_group_id( + dict(group_id='sg-123456789abcdef01'), + groups, + ) + assert groups == original_groups + assert rule_type == 'group' + assert created is False + assert target[0] is sentinel.CURRENT_ACCOUNT_ID + assert target[1] == 'sg-123456789abcdef01' + assert target[2] is None + + +def test_target_from_rule_with_group_id_peer_group(ec2_security_group): + groups = dict() + rule_type, target, created = ec2_security_group._target_from_rule_with_group_id( + dict(group_id='123456789012/sg-123456789abcdef02/example-group-name'), + groups, + ) + assert rule_type == 'group' + assert created is False + assert target[0] == '123456789012' + assert target[1] == 'sg-123456789abcdef02' + assert target[2] is None + + assert sorted(groups.keys()) == ['example-group-name', 'sg-123456789abcdef02'] + rule_by_id = groups['sg-123456789abcdef02'] + rule_by_name = groups['example-group-name'] + + assert rule_by_id is rule_by_name + assert rule_by_id['UserId'] == '123456789012' + assert rule_by_id['GroupId'] == 'sg-123456789abcdef02' + assert rule_by_id['GroupName'] == 'example-group-name' + + +def test_target_from_rule_with_group_id_elb(ec2_security_group): + groups = dict() + rule_type, target, created = ec2_security_group._target_from_rule_with_group_id( + dict(group_id='amazon-elb/amazon-elb-sg'), + groups, + ) + assert rule_type == 'group' + assert created is False + assert target[0] == 'amazon-elb' + assert target[1] is None + assert target[2] == 'amazon-elb-sg' + + assert 'amazon-elb-sg' in groups.keys() + rule_by_name = groups['amazon-elb-sg'] + + assert rule_by_name['UserId'] == 'amazon-elb' + assert rule_by_name['GroupId'] is None + assert rule_by_name['GroupName'] == 'amazon-elb-sg' + + +def test_target_from_rule_with_group_id_elb_with_sg(ec2_security_group): + groups = dict() + rule_type, target, created = ec2_security_group._target_from_rule_with_group_id( + dict(group_id='amazon-elb/sg-5a9c116a/amazon-elb-sg'), + groups, + ) + assert rule_type == 'group' + assert created is False + assert target[0] == 'amazon-elb' + assert target[1] is None + assert target[2] == 'amazon-elb-sg' + + assert sorted(groups.keys()) == ['amazon-elb-sg', 'sg-5a9c116a'] + rule_by_id = groups['sg-5a9c116a'] + rule_by_name = groups['amazon-elb-sg'] + + assert rule_by_id is rule_by_name + assert rule_by_id['UserId'] == 'amazon-elb' + assert rule_by_id['GroupId'] == 'sg-5a9c116a' + assert rule_by_id['GroupName'] == 'amazon-elb-sg' diff --git a/tests/unit/plugins/modules/ec2_security_group/test_validate_ip.py b/tests/unit/plugins/modules/ec2_security_group/test_validate_ip.py new file mode 100644 index 00000000000..d7cef80dd5e --- /dev/null +++ b/tests/unit/plugins/modules/ec2_security_group/test_validate_ip.py @@ -0,0 +1,66 @@ +# (c) 2022 Red Hat Inc. +# +# This file is part of Ansible +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import pytest +from unittest.mock import MagicMock +from unittest.mock import sentinel +import warnings + +import ansible_collections.amazon.aws.plugins.modules.ec2_security_group as ec2_security_group_module + + +@pytest.fixture +def aws_module(): + aws_module = MagicMock() + aws_module.warn = warnings.warn + return aws_module + + +@pytest.fixture +def ec2_security_group(monkeypatch): + # monkey patches various ec2_security_group module functions, we'll separately test the operation of + # these functions, we just care that it's passing the results into the right place in the + # instance spec. + monkeypatch.setattr(ec2_security_group_module, 'current_account_id', sentinel.CURRENT_ACCOUNT_ID) + return ec2_security_group_module + + +IPS_GOOD = [ + ('192.0.2.2', '192.0.2.2',), + ('192.0.2.1/32', '192.0.2.1/32',), + ('192.0.2.1/255.255.255.255', '192.0.2.1/32',), + ('192.0.2.0/24', '192.0.2.0/24',), + ('192.0.2.0/255.255.255.255', '192.0.2.0/32',), + ('2001:db8::1/128', '2001:db8::1/128',), + ('2001:db8::/32', '2001:db8::/32',), + ('2001:db8:fe80:b897:8990:8a7c:99bf:323d/128', '2001:db8:fe80:b897:8990:8a7c:99bf:323d/128'), +] + +IPS_WARN = [ + ('192.0.2.1/24', '192.0.2.0/24', 'One of your CIDR addresses'), + ('2001:DB8::1/32', '2001:DB8::/32', 'One of your IPv6 CIDR addresses'), + ('2001:db8:fe80:b897:8990:8a7c:99bf:323d/64', '2001:db8:fe80:b897::/64', 'One of your IPv6 CIDR addresses'), +] + + +@pytest.mark.parametrize("ip,expected", IPS_GOOD) +def test_validate_ip_no_warn(ec2_security_group, aws_module, ip, expected): + with warnings.catch_warnings(): + warnings.simplefilter("error") + result = ec2_security_group.validate_ip(aws_module, ip) + + assert result == expected + + +@pytest.mark.parametrize("ip,expected,warn_msg", IPS_WARN) +def test_validate_ip_warn(ec2_security_group, aws_module, ip, warn_msg, expected): + with pytest.warns(UserWarning, match=warn_msg) as recorded: + result = ec2_security_group.validate_ip(aws_module, ip) + + assert len(recorded) == 1 + assert result == expected diff --git a/tests/unit/plugins/modules/test_ec2_security_group.py b/tests/unit/plugins/modules/test_ec2_security_group.py index 1ebbe86c680..c75dcac816a 100644 --- a/tests/unit/plugins/modules/test_ec2_security_group.py +++ b/tests/unit/plugins/modules/test_ec2_security_group.py @@ -66,18 +66,3 @@ def test_rule_to_permission(): perm = group_module.to_permission(test) assert perm['FromPort'], perm['ToPort'] == test.port_range assert perm['IpProtocol'] == test.protocol - - -def test_validate_ip(): - class Warner(object): - def warn(self, msg): - return - ips = [ - ('10.1.1.1/24', '10.1.1.0/24'), - ('192.168.56.101/16', '192.168.0.0/16'), - # Don't modify IPv6 CIDRs, AWS supports /128 and device ranges - ('fc00:8fe0:fe80:b897:8990:8a7c:99bf:323d/128', 'fc00:8fe0:fe80:b897:8990:8a7c:99bf:323d/128'), - ] - - for ip, net in ips: - assert group_module.validate_ip(Warner(), ip) == net