From ba1727140a1bddb14eec06da7d25b8024e483bca Mon Sep 17 00:00:00 2001 From: Mike Urbanski Date: Mon, 19 Sep 2022 12:55:53 -0500 Subject: [PATCH 1/4] handle fixes for cloned OOTB policies --- .../features/fixes_integration.py | 8 +- .../integration_features/resources/main.tf | 3 + .../test_fixes_integration.py | 108 ++++++++++++++++++ 3 files changed, 117 insertions(+), 2 deletions(-) create mode 100644 tests/common/integration_features/resources/main.tf diff --git a/checkov/common/bridgecrew/integration_features/features/fixes_integration.py b/checkov/common/bridgecrew/integration_features/features/fixes_integration.py index 63befeeaf8a..01f6eae0e66 100644 --- a/checkov/common/bridgecrew/integration_features/features/fixes_integration.py +++ b/checkov/common/bridgecrew/integration_features/features/fixes_integration.py @@ -69,9 +69,13 @@ def _get_platform_fixes(self, scan_report: Report) -> None: for fix in all_fixes: ckv_id = metadata_integration.get_ckv_id_from_bc_id(fix['policyId']) if not ckv_id: - logging.error(f"BC ID {fix['policyId']} has no checkov ID") + logging.debug(f"BC ID {fix['policyId']} has no checkov ID - might be a cloned policy") + ckv_id = fix['policyId'] + + failed_check = failed_check_by_check_resource.get((ckv_id, fix['resourceId'])) + if not failed_check: + logging.warning(f'Could not find the corresponding failed check for the fix for ID {ckv_id} and resource {fix["resourceId"]}') continue - failed_check = failed_check_by_check_resource[(ckv_id, fix['resourceId'])] failed_check.fixed_definition = fix['fixedDefinition'] def _get_fixes_for_file( diff --git a/tests/common/integration_features/resources/main.tf b/tests/common/integration_features/resources/main.tf new file mode 100644 index 00000000000..5a9126a80df --- /dev/null +++ b/tests/common/integration_features/resources/main.tf @@ -0,0 +1,3 @@ +resource "aws_subnet" "s" { + map_public_ip_on_launch = true +} diff --git a/tests/common/integration_features/test_fixes_integration.py b/tests/common/integration_features/test_fixes_integration.py index 9e65aabfb37..615c1871fb1 100644 --- a/tests/common/integration_features/test_fixes_integration.py +++ b/tests/common/integration_features/test_fixes_integration.py @@ -1,7 +1,19 @@ +from __future__ import annotations + +import os import unittest +from typing import Any, Iterable +from checkov.common.bridgecrew.check_type import CheckType from checkov.common.bridgecrew.integration_features.features.fixes_integration import FixesIntegration +from checkov.common.bridgecrew.integration_features.features.policy_metadata_integration import integration as metadata_integration from checkov.common.bridgecrew.platform_integration import BcPlatformIntegration +from checkov.common.models.enums import CheckResult +from checkov.common.output.record import Record +from checkov.common.output.report import Report + + +_old_check_metadata = None class TestFixesIntegration(unittest.TestCase): @@ -26,6 +38,102 @@ def test_integration_valid(self): fixes_integration.integration_feature_failures = True self.assertFalse(fixes_integration.is_valid()) + def test_apply_fixes_to_report(self): + instance = BcPlatformIntegration() + instance.skip_fixes = False + instance.platform_integration_configured = True + + fixes_integration = FixesIntegration(instance) + fixes_integration._get_fixes_for_file = mock_fixes_response + + metadata_integration.check_metadata = { + 'custom_aws_12345': {'guideline': 'https://docs.bridgecrew.io/docs/ensure-vpc-subnets-do-not-assign-public-ip-by-default'}, + 'CKV_AWS_130': { + 'id': 'BC_AWS_NETWORKING_53', + 'title': 'Ensure VPC subnets do not assign public IP by default', + 'guideline': 'https://docs.bridgecrew.io/docs/ensure-vpc-subnets-do-not-assign-public-ip-by-default', + 'severity': 'MEDIUM', + 'pcSeverity': 'MEDIUM', + 'category': 'Networking', + 'checkovId': 'CKV_AWS_130', + 'constructiveTitle': 'Ensure VPC subnets do not assign public IP by default', + 'descriptiveTitle': 'AWS VPC subnets should not allow automatic public IP assignment', + 'pcPolicyId': '11743cd3-35e4-4639-91e1-bc87b52d4cf5', + 'additionalPcPolicyIds': ['11743cd3-35e4-4639-91e1-bc87b52d4cf5'], + 'benchmarks': {} + } + } + + metadata_integration.bc_to_ckv_id_mapping = { + 'BC_AWS_NETWORKING_53': 'CKV_AWS_130' + } + + report = Report(CheckType.TERRAFORM) + + file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'resources', 'main.tf') + + report.add_record(Record( + check_id='CKV_AWS_130', + bc_check_id='BC_AWS_NETWORKING_53', + check_result={'result': CheckResult.FAILED, 'evaluated_keys': ['map_public_ip_on_launch']}, + check_name='Ensure VPC subnets do not assign public IP by default', + check_class='checkov.terraform.checks.resource.aws.SubnetPublicIP', + code_block=[(1, 'resource "aws_subnet" "s" {\n'), (2, ' map_public_ip_on_launch = true\n'), (3, '}\n')], + evaluations=None, + file_abs_path=file, + file_line_range=[2, 3], + file_path='/main.tf', + resource='aws_subnet.s' + )) + + report.add_record(Record( + check_id='custom_aws_12345', + bc_check_id='custom_aws_12345', + check_result={'result': CheckResult.FAILED, 'evaluated_keys': ['map_public_ip_on_launch']}, + check_name='Cloned-Ensure VPC subnets do not assign public IP by default', + check_class='checkov.terraform.checks.resource.aws.SubnetPublicIP', + code_block=[(1, 'resource "aws_subnet" "s" {\n'), (2, ' map_public_ip_on_launch = true\n'), (3, '}\n')], + evaluations=None, + file_abs_path=file, + file_line_range=[2, 3], + file_path='/main.tf', + resource='aws_subnet.s' + )) + + fixes_integration.post_runner(report) + + self.assertTrue(all(r.fixed_definition is not None for r in report.failed_checks)) + + def setUp(self) -> None: + global _old_check_metadata + _old_check_metadata = metadata_integration.check_metadata + + def tearDown(self) -> None: + metadata_integration.check_metadata = _old_check_metadata + + +def mock_fixes_response(check_type: str, filename: str, file_contents: str, failed_checks: Iterable[Record] + ) -> dict[str, Any] | None: + return { + 'filePath': '/private/tmp/custom/main.tf', + 'fixes': [ + { + 'resourceId': 'aws_subnet.s', + 'policyId': 'BC_AWS_NETWORKING_53', + 'originalStartLine': 1, + 'originalEndLine': 3, + 'fixedDefinition': 'resource "aws_subnet" "s" {\n}\n' + }, + { + 'resourceId': 'aws_subnet.s', + 'policyId': 'custom_aws_12345', + 'originalStartLine': 1, + 'originalEndLine': 3, + 'fixedDefinition': 'resource "aws_subnet" "s" {\n}\n' + } + ] + } + if __name__ == '__main__': unittest.main() From d9f1f62581dec74fa7bb42672c600c11df9ea73f Mon Sep 17 00:00:00 2001 From: Mike Urbanski Date: Mon, 19 Sep 2022 13:04:39 -0500 Subject: [PATCH 2/4] add mypy skip --- .../integration_features/features/fixes_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/checkov/common/bridgecrew/integration_features/features/fixes_integration.py b/checkov/common/bridgecrew/integration_features/features/fixes_integration.py index 01f6eae0e66..632a90c1fca 100644 --- a/checkov/common/bridgecrew/integration_features/features/fixes_integration.py +++ b/checkov/common/bridgecrew/integration_features/features/fixes_integration.py @@ -72,7 +72,7 @@ def _get_platform_fixes(self, scan_report: Report) -> None: logging.debug(f"BC ID {fix['policyId']} has no checkov ID - might be a cloned policy") ckv_id = fix['policyId'] - failed_check = failed_check_by_check_resource.get((ckv_id, fix['resourceId'])) + failed_check = failed_check_by_check_resource.get((ckv_id, fix['resourceId'])) # type:ignore[arg-type] # ckv_id is not None here if not failed_check: logging.warning(f'Could not find the corresponding failed check for the fix for ID {ckv_id} and resource {fix["resourceId"]}') continue From 5966d006e493f8f99eaf3b4852ffafdd4c34c30e Mon Sep 17 00:00:00 2001 From: Mike Urbanski Date: Mon, 19 Sep 2022 13:05:48 -0500 Subject: [PATCH 3/4] make ckv_id explicitly not none to be safe --- .../integration_features/features/fixes_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/checkov/common/bridgecrew/integration_features/features/fixes_integration.py b/checkov/common/bridgecrew/integration_features/features/fixes_integration.py index 632a90c1fca..a19e369a224 100644 --- a/checkov/common/bridgecrew/integration_features/features/fixes_integration.py +++ b/checkov/common/bridgecrew/integration_features/features/fixes_integration.py @@ -70,7 +70,7 @@ def _get_platform_fixes(self, scan_report: Report) -> None: ckv_id = metadata_integration.get_ckv_id_from_bc_id(fix['policyId']) if not ckv_id: logging.debug(f"BC ID {fix['policyId']} has no checkov ID - might be a cloned policy") - ckv_id = fix['policyId'] + ckv_id = fix.get('policyId', '') failed_check = failed_check_by_check_resource.get((ckv_id, fix['resourceId'])) # type:ignore[arg-type] # ckv_id is not None here if not failed_check: From 340e9eb3f928d1c86725a01c9402d43dfa3e2ae0 Mon Sep 17 00:00:00 2001 From: mikeurbanski1 Date: Mon, 19 Sep 2022 13:06:38 -0500 Subject: [PATCH 4/4] Update tests/common/integration_features/test_fixes_integration.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Anton GrĂ¼bel --- tests/common/integration_features/test_fixes_integration.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/common/integration_features/test_fixes_integration.py b/tests/common/integration_features/test_fixes_integration.py index 615c1871fb1..ddeec2ace38 100644 --- a/tests/common/integration_features/test_fixes_integration.py +++ b/tests/common/integration_features/test_fixes_integration.py @@ -105,11 +105,10 @@ def test_apply_fixes_to_report(self): self.assertTrue(all(r.fixed_definition is not None for r in report.failed_checks)) def setUp(self) -> None: - global _old_check_metadata - _old_check_metadata = metadata_integration.check_metadata + self._old_check_metadata = metadata_integration.check_metadata def tearDown(self) -> None: - metadata_integration.check_metadata = _old_check_metadata + metadata_integration.check_metadata = self._old_check_metadata def mock_fixes_response(check_type: str, filename: str, file_contents: str, failed_checks: Iterable[Record]