diff --git a/cdpctl/validation/infra/validate_aws_security_groups.py b/cdpctl/validation/infra/validate_aws_security_groups.py index d4147dd..20566c6 100644 --- a/cdpctl/validation/infra/validate_aws_security_groups.py +++ b/cdpctl/validation/infra/validate_aws_security_groups.py @@ -212,7 +212,7 @@ def _aws_gateway_security_groups_contains_cdp_cidr_validation( def security_groups_contains_vpc_cidr( - config: Dict[str, Any], security_groups_id: str + config: Dict[str, Any], ec2_client: EC2Client, security_groups_id: str ) -> None: """Verify security groups contains the VPC CIDRs.""" vpc_id: str = get_config_value( @@ -222,8 +222,6 @@ def security_groups_contains_vpc_cidr( data_expected_error_message="No vpc id provided for " "config option: {}", ) - ec2_client: EC2Client = get_client("ec2", config) - vpcs = ec2_client.describe_vpcs(VpcIds=[vpc_id]) if len(vpcs["Vpcs"]) == 0: @@ -240,9 +238,17 @@ def security_groups_contains_vpc_cidr( for group in security_groups["SecurityGroups"]: ip_permissions = group["IpPermissions"] + for ip_permission in ip_permissions: + if "IpProtocol" in ip_permission and ip_permission["IpProtocol"] == "-1": + for cidr in ip_permission["IpRanges"]: + if cidr["CidrIp"] == vpc_cidr: + found_vpc_cidr = True + continue + if "FromPort" not in ip_permission or "ToPort" not in ip_permission: continue + from_port = ip_permission["FromPort"] to_port = ip_permission["ToPort"] @@ -258,6 +264,14 @@ def security_groups_contains_vpc_cidr( @pytest.mark.infra def aws_default_security_groups_contains_vpc_cidr_validation( config: Dict[str, Any] +) -> None: + """Default security groups contain the existing VPC CIDRs.""" # noqa: D401,E501 + ec2_client: EC2Client = get_client("ec2", config) + _aws_default_security_groups_contains_vpc_cidr_validation(config, ec2_client) + + +def _aws_default_security_groups_contains_vpc_cidr_validation( + config: Dict[str, Any], ec2_client: EC2Client ) -> None: """Default security groups contain the existing VPC CIDRs.""" # noqa: D401,E501 default_security_groups_id: str = get_config_value( @@ -269,7 +283,9 @@ def aws_default_security_groups_contains_vpc_cidr_validation( "config option: {}", ) - if not security_groups_contains_vpc_cidr(config, default_security_groups_id): + if not security_groups_contains_vpc_cidr( + config, ec2_client, default_security_groups_id + ): pytest.fail( "Your VPC CIDR should be allowed for port " " 0-65535 in default security group.", @@ -281,6 +297,14 @@ def aws_default_security_groups_contains_vpc_cidr_validation( @pytest.mark.infra def aws_gateway_security_groups_contains_vpc_cidr_validation( config: Dict[str, Any] +) -> None: + """Gateway security groups contain the existing VPC CIDRs.""" # noqa: D401,E501 + ec2_client: EC2Client = get_client("ec2", config) + _aws_gateway_security_groups_contains_vpc_cidr_validation(config, ec2_client) + + +def _aws_gateway_security_groups_contains_vpc_cidr_validation( + config: Dict[str, Any], ec2_client: EC2Client ) -> None: """Gateway security groups contain the existing VPC CIDRs.""" # noqa: D401,E501 gateway_security_groups_id: str = get_config_value( @@ -292,7 +316,9 @@ def aws_gateway_security_groups_contains_vpc_cidr_validation( "config option: {}", ) - if not security_groups_contains_vpc_cidr(config, gateway_security_groups_id): + if not security_groups_contains_vpc_cidr( + config, ec2_client, gateway_security_groups_id + ): pytest.fail( "Your VPC CIDR should be allowed for port " "0-65535 in gateway security group.", diff --git a/tests/validation/infra/test_validate_aws_security_groups.py b/tests/validation/infra/test_validate_aws_security_groups.py index ab16bb2..939d998 100644 --- a/tests/validation/infra/test_validate_aws_security_groups.py +++ b/tests/validation/infra/test_validate_aws_security_groups.py @@ -52,13 +52,16 @@ ) from cdpctl.validation.infra.validate_aws_security_groups import ( _aws_default_security_groups_contains_cdp_cidr_validation, + _aws_default_security_groups_contains_vpc_cidr_validation, _aws_gateway_security_groups_contains_cdp_cidr_validation, + _aws_gateway_security_groups_contains_vpc_cidr_validation, ) from tests.validation import expect_validation_failure, expect_validation_success default_security_group = "test_default_111" gateway_security_group = "test_gateway_111" vpc_id = "test_vpc_111" +vpc_cidr = "30.1.0.0/16" config: Dict[str, Any] = { "env": {"tunnel": False}, @@ -79,6 +82,33 @@ } +def add_describe_vpc_response(stubber: Stubber): + stubber.add_response( + "describe_vpcs", + { + "Vpcs": [ + { + "CidrBlock": vpc_cidr, + "DhcpOptionsId": "dopt-19edf471", + "State": "available", + "VpcId": "vpc-0e9801d129EXAMPLE", + "OwnerId": "111122223333", + "InstanceTenancy": "default", + "CidrBlockAssociationSet": [ + { + "AssociationId": "vpc-cidr-assoc-062c64cfafEXAMPLE", + "CidrBlock": "30.1.0.0/16", + "CidrBlockState": {"State": "associated"}, + } + ], + "Tags": [{"Key": "Name", "Value": "Not Shared"}], + } + ] + }, + expected_params={"VpcIds": [vpc_id]}, + ) + + @mock_iam def test_aws_default_security_groups_contains_cdp_cidr_validation_succeeds( cdp_cidrs: List[str], # noqa : F811 pylint: disable=redefined-outer-name @@ -251,3 +281,209 @@ def test_aws_gateway_security_groups_contains_cdp_cidr_validation_fails( _aws_gateway_security_groups_contains_cdp_cidr_validation ) func(config, ec2_client, cdp_cidrs) + + +@mock_iam +def test_aws_gateway_security_groups_contains_vpc_cidr_validation_succeeds_with_ports(): + """Verify validation succeeds for VPC cidr within gateway security group with ports.""" + ec2_client: EC2Client = boto3.client("ec2", "us-west-2") + stubber = Stubber(ec2_client) + + add_describe_vpc_response(stubber) + + stubber.add_response( + "describe_security_groups", + { + "SecurityGroups": [ + { + "Description": "test", + "GroupName": "test", + "IpPermissions": [ + { + "FromPort": 0, + "ToPort": 65535, + "IpRanges": [{"CidrIp": vpc_cidr}], + } + ], + } + ] + }, + expected_params={"GroupIds": [gateway_security_group]}, + ) + + with stubber: + func = expect_validation_success( + _aws_gateway_security_groups_contains_vpc_cidr_validation + ) + func(config, ec2_client) + + +@mock_iam +def test_aws_gateway_security_groups_contains_vpc_cidr_validation_succeeds_without_ports(): + """Verify validation succeeds for VPC cidr within gateway security group without ports.""" + ec2_client: EC2Client = boto3.client("ec2", "us-west-2") + stubber = Stubber(ec2_client) + + add_describe_vpc_response(stubber) + + stubber.add_response( + "describe_security_groups", + { + "SecurityGroups": [ + { + "Description": "test", + "GroupName": "test", + "IpPermissions": [ + { + "IpProtocol": "-1", + "IpRanges": [{"CidrIp": vpc_cidr}], + } + ], + } + ] + }, + expected_params={"GroupIds": [gateway_security_group]}, + ) + + with stubber: + func = expect_validation_success( + _aws_gateway_security_groups_contains_vpc_cidr_validation + ) + func(config, ec2_client) + + +@mock_iam +def test_aws_gateway_security_groups_contains_vpc_cidr_validation_fails(): + """Verify validation fails for VPC cidr within gateway security group.""" + ec2_client: EC2Client = boto3.client("ec2", "us-west-2") + stubber = Stubber(ec2_client) + + add_describe_vpc_response(stubber) + + stubber.add_response( + "describe_security_groups", + { + "SecurityGroups": [ + { + "Description": "test", + "GroupName": "test", + "IpPermissions": [ + { + "IpProtocol": "-1", + "IpRanges": [{"CidrIp": "fail_cidr"}], + } + ], + } + ] + }, + expected_params={"GroupIds": [gateway_security_group]}, + ) + + with stubber: + func = expect_validation_failure( + _aws_gateway_security_groups_contains_vpc_cidr_validation + ) + func(config, ec2_client) + + +@mock_iam +def test_aws_default_security_groups_contains_vpc_cidr_validation_succeeds_with_ports(): + """Verify validation succeeds for VPC cidr within default security group using ports.""" + ec2_client: EC2Client = boto3.client("ec2", "us-west-2") + stubber = Stubber(ec2_client) + + add_describe_vpc_response(stubber) + + stubber.add_response( + "describe_security_groups", + { + "SecurityGroups": [ + { + "Description": "test", + "GroupName": "test", + "IpPermissions": [ + { + "FromPort": 0, + "ToPort": 65535, + "IpRanges": [{"CidrIp": vpc_cidr}], + } + ], + } + ] + }, + expected_params={"GroupIds": [default_security_group]}, + ) + + with stubber: + func = expect_validation_success( + _aws_default_security_groups_contains_vpc_cidr_validation + ) + func(config, ec2_client) + + +@mock_iam +def test_aws_default_security_groups_contains_vpc_cidr_validation_succeeds_without_ports(): + """Verify validation succeeds for VPC cidr within default security group without ports.""" + ec2_client: EC2Client = boto3.client("ec2", "us-west-2") + stubber = Stubber(ec2_client) + + add_describe_vpc_response(stubber) + + stubber.add_response( + "describe_security_groups", + { + "SecurityGroups": [ + { + "Description": "test", + "GroupName": "test", + "IpPermissions": [ + { + "IpProtocol": "-1", + "IpRanges": [{"CidrIp": vpc_cidr}], + } + ], + } + ] + }, + expected_params={"GroupIds": [default_security_group]}, + ) + + with stubber: + func = expect_validation_success( + _aws_default_security_groups_contains_vpc_cidr_validation + ) + func(config, ec2_client) + + +@mock_iam +def test_aws_default_security_groups_contains_vpc_cidr_validation_fails(): + """Verify validation fails for VPC cidr within default security group.""" + ec2_client: EC2Client = boto3.client("ec2", "us-west-2") + stubber = Stubber(ec2_client) + + add_describe_vpc_response(stubber) + + stubber.add_response( + "describe_security_groups", + { + "SecurityGroups": [ + { + "Description": "test", + "GroupName": "test", + "IpPermissions": [ + { + "IpProtocol": "-1", + "IpRanges": [{"CidrIp": "fail_cidr"}], + } + ], + } + ] + }, + expected_params={"GroupIds": [default_security_group]}, + ) + + with stubber: + func = expect_validation_failure( + _aws_default_security_groups_contains_vpc_cidr_validation + ) + func(config, ec2_client)