From d3fbd88ca100da2bdddd3b3915bfd81b8bfeb766 Mon Sep 17 00:00:00 2001 From: nimrodkor Date: Sun, 27 Nov 2022 09:25:40 +0200 Subject: [PATCH] platform(secrets): Support custom detectors from the platform (#3926) --- .../secrets/plugins/custom_regex_detector.py | 110 ++++++++++++++ checkov/secrets/runner.py | 24 ++- .../secrets/custom_regex_detector/Dockerfile | 10 ++ tests/secrets/test_custom_regex_detector.py | 139 ++++++++++++++++++ 4 files changed, 269 insertions(+), 14 deletions(-) create mode 100644 checkov/secrets/plugins/custom_regex_detector.py create mode 100644 tests/secrets/custom_regex_detector/Dockerfile create mode 100644 tests/secrets/test_custom_regex_detector.py diff --git a/checkov/secrets/plugins/custom_regex_detector.py b/checkov/secrets/plugins/custom_regex_detector.py new file mode 100644 index 00000000000..f46a8274b6a --- /dev/null +++ b/checkov/secrets/plugins/custom_regex_detector.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +import logging +from typing import Set, Any, Generator, Pattern, Optional, Dict, Tuple, List, TYPE_CHECKING + +import yaml +from detect_secrets.constants import VerifiedResult +from detect_secrets.core.potential_secret import PotentialSecret +from detect_secrets.plugins.base import RegexBasedDetector +from detect_secrets.util.inject import call_function_with_arguments +import re + +from checkov.common.bridgecrew.platform_integration import bc_integration + +if TYPE_CHECKING: + from detect_secrets.util.code_snippet import CodeSnippet + + +def load_detectors() -> list[dict[str, Any]]: + detectors: List[dict[str, Any]] = [] + try: + customer_run_config_response = bc_integration.customer_run_config_response + policies_list: List[dict[str, Any]] = [] + if customer_run_config_response['secretsPolicies']: # type: ignore + policies_list = customer_run_config_response['secretsPolicies'] # type: ignore + except Exception as e: + logging.error(f"Failed to get detectors from customer_run_config_response, error: {e}") + return [] + + if policies_list: + detectors = modify_secrets_policy_to_detectors(policies_list) + if detectors: + logging.info(f"Successfully loaded {len(detectors)} detectors from bc_integration") + return detectors + + +def modify_secrets_policy_to_detectors(policies_list: List[dict[str, Any]]) -> List[dict[str, Any]]: + secrets_list = transforms_policies_to_detectors_list(policies_list) + logging.info(f"(modify_secrets_policy_to_detectors) secrets_list = {secrets_list}") + return secrets_list + + +def transforms_policies_to_detectors_list(custom_secrets: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + custom_detectors: List[Dict[str, Any]] = [] + for secret_policy in custom_secrets: + not_parsed = True + code = secret_policy['code'] + if code: + code_dict = yaml.safe_load(secret_policy['code']) + if 'definition' in code_dict: + if 'value' in code_dict['definition']: + not_parsed = False + for regex in code_dict['definition']['value']: + check_id = secret_policy['checkovCheckId'] if secret_policy['checkovCheckId'] else \ + secret_policy['incidentId'] + custom_detectors.append({'Name': secret_policy['title'], + 'Check_ID': check_id, + 'Regex': regex}) + if not_parsed: + logging.info(f"policy : {secret_policy} could not be parsed") + return custom_detectors + + +class CustomRegexDetector(RegexBasedDetector): + secret_type = "Regex Detector" # noqa: CCE003 # nosec + denylist: Set[Pattern[str]] = set() # noqa: CCE003 + + def __init__(self) -> None: + self.regex_to_metadata: dict[str, dict[str, Any]] = dict() + self.denylist = set() + detectors = load_detectors() + + for detector in detectors: + self.denylist.add(re.compile(r'{}'.format(detector["Regex"]))) + self.regex_to_metadata[detector["Regex"]] = detector + + def analyze_line( + self, + filename: str, + line: str, + line_number: int = 0, + context: Optional[CodeSnippet] = None, + raw_context: Optional[CodeSnippet] = None, + **kwargs: Any + ) -> Set[PotentialSecret]: + """This examines a line and finds all possible secret values in it.""" + output: Set[PotentialSecret] = set() + for match, regex in self.analyze_string(line, **kwargs): + try: + verified_result = call_function_with_arguments(self.verify, secret=match, context=context) + is_verified = True if verified_result == VerifiedResult.VERIFIED_TRUE else False + except Exception: + is_verified = False + + ps = PotentialSecret(type=self.regex_to_metadata[regex.pattern]["Name"], filename=filename, secret=match, + line_number=line_number, is_verified=is_verified) + ps.check_id = self.regex_to_metadata[regex.pattern]["Check_ID"] # type:ignore[attr-defined] + output.add(ps) + + return output + + def analyze_string(self, string: str, **kwargs: Optional[Dict[str, Any]]) -> Generator[Tuple[str, Pattern[str]], None, None]: # type:ignore[override] + for regex in self.denylist: + for match in regex.findall(string): + if isinstance(match, tuple): + for submatch in filter(bool, match): + # It might make sense to paste break after yielding + yield submatch, regex + else: + yield match, regex diff --git a/checkov/secrets/runner.py b/checkov/secrets/runner.py index e796441e55d..bc5cfda307f 100644 --- a/checkov/secrets/runner.py +++ b/checkov/secrets/runner.py @@ -5,7 +5,6 @@ import logging import os import re -from os.path import exists from pathlib import Path from typing import TYPE_CHECKING, cast @@ -13,6 +12,7 @@ from detect_secrets.core import scan from detect_secrets.settings import transient_settings + from checkov.common.bridgecrew.check_type import CheckType from checkov.common.bridgecrew.integration_features.features.policy_metadata_integration import \ integration as metadata_integration @@ -95,18 +95,15 @@ def run( {'name': 'TwilioKeyDetector'}, {'name': 'EntropyKeywordCombinator', 'path': f'file://{current_dir}/plugins/entropy_keyword_combinator.py'} ] - custom_plugins = os.getenv("CHECKOV_CUSTOM_DETECTOR_PLUGINS_PATH") - logging.info(f"Custom detector flag set to {custom_plugins}") - if custom_plugins: - detector_path = f"{custom_plugins}/custom_regex_detector.py" - if exists(detector_path): - logging.info(f"Custom detector found at {detector_path}. Loading...") - plugins_used.append({ - 'name': 'CustomRegexDetector', - 'path': f'file://{detector_path}' - }) - else: - logging.info(f"Custom detector not found at path {detector_path}. Skipping...") + + detector_path = f"{current_dir}/plugins/custom_regex_detector.py" + logging.info(f"Custom detector found at {detector_path}. Loading...") + enable_secret_scan_all_files = runner_filter.enable_secret_scan_all_files + if enable_secret_scan_all_files: + plugins_used.append({ + 'name': 'CustomRegexDetector', + 'path': f'file://{detector_path}' + }) with transient_settings({ # Only run scans with only these plugins. 'plugins_used': plugins_used @@ -119,7 +116,6 @@ def run( files_to_scan = files or [] excluded_paths = (runner_filter.excluded_paths or []) + ignored_directories + [DEFAULT_EXTERNAL_MODULES_DIR] if root_folder: - enable_secret_scan_all_files = runner_filter.enable_secret_scan_all_files block_list_secret_scan = runner_filter.block_list_secret_scan or [] block_list_secret_scan_lower = [file_type.lower() for file_type in block_list_secret_scan] for root, d_names, f_names in os.walk(root_folder): diff --git a/tests/secrets/custom_regex_detector/Dockerfile b/tests/secrets/custom_regex_detector/Dockerfile new file mode 100644 index 00000000000..eb5a2b6e5c8 --- /dev/null +++ b/tests/secrets/custom_regex_detector/Dockerfile @@ -0,0 +1,10 @@ +FROM base:1 +ENV aws_access_key=AKIA4NACSIJMDDNSEDTE +USER bob +ENV AWS_ACCESS_KEY_ID="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" +ENV SEC_4="dsapi45202d12abdce73c004a9e0be24a21b2" +ENV MY_SEC_1="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" +ENV MY_SEC_2="glpat-KDNon6sfvHRKL8NtFfNR" +ENV CIRCLE="2065ae463be4e434bb1d074a366d44e7a776d472" +ENV SEC_3="eyJrIjoiNUwyZU7TMmRxQXNVcnR7UXB0ME4zYkhRaTk2STVhR0MiLCJuIjoidGVtcCIsImlkIjoxfQ==" +ENV JIRA="5FP0NmFYz81U32XdjNb42762" \ No newline at end of file diff --git a/tests/secrets/test_custom_regex_detector.py b/tests/secrets/test_custom_regex_detector.py new file mode 100644 index 00000000000..279dfc4b211 --- /dev/null +++ b/tests/secrets/test_custom_regex_detector.py @@ -0,0 +1,139 @@ +import unittest +from typing import Any, Dict, List + +from checkov.common.bridgecrew.platform_integration import bc_integration +from checkov.secrets.plugins.custom_regex_detector import modify_secrets_policy_to_detectors, CustomRegexDetector + + +class TestCustomRegexDetector(unittest.TestCase): + + def test_modify_secrets_policy_to_detectors(self) -> None: + policies_list: List[Dict[str, Any]] = [ + { + "incidentId": "lshindelman1_AWS_1666860510378", + "category": "Secrets", + "severity": "MEDIUM", + "incidentType": "Violation", + "title": "test", + "guideline": "test", + "laceworkViolationId": None, + "prowlerCheckId": None, + "checkovCheckId": None, + "resourceTypes": [ + "aws_instance" + ], + "provider": "AWS", + "remediationIds": [], + "conditionQuery": { + "or": [ + { + "value": "t3.micro", + "operator": "equals", + "attribute": "instance_type", + "cond_type": "attribute", + "resource_types": [ + "aws_instance" + ] + }, + { + "value": "t3.nano", + "operator": "equals", + "attribute": "instance_type", + "cond_type": "attribute", + "resource_types": [ + "aws_instance" + ] + } + ] + }, + "customerName": "lshindelman1", + "isCustom": True, + "createdBy": "lshindelman+1@paloaltonetworks.com", + "code": "---\nmetadata:\n name: \"test\" #give your custom policy a unique name \n guidelines: \"test\" #add text that explains the configuration the policy looks for, its implications, and how to fix it\n category: \"secrets\" #choose one: \"general\"/\"elasticsearch\"/\"iam\"/\"kubernetes\"/\"logging\"/\"monitoring\"/\"networking\"/\"public\"/\"secrets\"/\"serverless\"/\"storage\"\n severity: \"medium\" #choose one: \"critical\"/\"high\"/\"medium\"/\"low\"\nscope:\n provider: \"aws\" #choose one: \"aws\"/\"azure\"/\"gcp\"/\"kubernetes\"\ndefinition: #define the conditions the policy searches for.\n# The example below checks EC2s with instance_type t3.micro or t3.nano. for more examples please visit our docs - https://docs.bridgecrew.io/docs/yaml-format-for-custom-policies\n or:\n - cond_type: \"attribute\"\n resource_types:\n - \"aws_instance\"\n attribute: \"instance_type\"\n operator: \"equals\"\n value: \"t3.micro\"\n - cond_type: \"attribute\"\n resource_types:\n - \"aws_instance\"\n attribute: \"instance_type\"\n operator: \"equals\"\n value: \"t3.nano\"\n", + "descriptiveTitle": None, + "constructiveTitle": None, + "pcPolicyId": None, + "additionalPcPolicyIds": None, + "frameworks": [ + "CloudFormation", + "Terraform" + ], + "pcSeverity": None, + "sourceIncidentId": None + } + ] + detector_obj = modify_secrets_policy_to_detectors(policies_list) + detectors_result: List[Dict[str, Any]] = [] + detector_obj.sort(key=lambda detector: detector['Check_ID']) + detectors_result.sort(key=lambda detector: detector['Check_ID']) # type: ignore + assert all( + True for x in range(0, len(detector_obj)) if detector_obj[x]['Check_ID'] == detectors_result[x]['Check_ID']) + assert len(detectors_result) == len(detector_obj) + + def test_test_custom_regex_detector(self) -> None: + bc_integration.customer_run_config_response = {"secretsPolicies": [ + { + "incidentId": "lshindelman1_AWS_1666860510378", + "category": "Secrets", + "severity": "MEDIUM", + "incidentType": "Violation", + "title": "test", + "guideline": "test", + "laceworkViolationId": None, + "prowlerCheckId": None, + "checkovCheckId": None, + "resourceTypes": + [ + "aws_instance" + ], + "provider": "AWS", + "remediationIds": + [], + "conditionQuery": + { + "or": + [ + { + "value": "t3.micro", + "operator": "equals", + "attribute": "instance_type", + "cond_type": "attribute", + "resource_types": + [ + "aws_instance" + ] + }, + { + "value": "t3.nano", + "operator": "equals", + "attribute": "instance_type", + "cond_type": "attribute", + "resource_types": + [ + "aws_instance" + ] + } + ] + }, + "customerName": "lshindelman1", + "isCustom": True, + "createdBy": "lshindelman+1@paloaltonetworks.com", + "code": "---\nmetadata:\n name: \"test\" #give your custom policy a unique name \n guidelines: \"test\" #add text that explains the configuration the policy looks for, its implications, and how to fix it\n category: \"secrets\" #choose one: \"general\"/\"elasticsearch\"/\"iam\"/\"kubernetes\"/\"logging\"/\"monitoring\"/\"networking\"/\"public\"/\"secrets\"/\"serverless\"/\"storage\"\n severity: \"medium\" #choose one: \"critical\"/\"high\"/\"medium\"/\"low\"\nscope:\n provider: \"aws\" #choose one: \"aws\"/\"azure\"/\"gcp\"/\"kubernetes\"\ndefinition: #define the conditions the policy searches for.\n# The example below checks EC2s with instance_type t3.micro or t3.nano. for more examples please visit our docs - https://docs.bridgecrew.io/docs/yaml-format-for-custom-policies\n or:\n - cond_type: \"attribute\"\n resource_types:\n - \"aws_instance\"\n attribute: \"instance_type\"\n operator: \"equals\"\n value: \"t3.micro\"\n - cond_type: \"attribute\"\n resource_types:\n - \"aws_instance\"\n attribute: \"instance_type\"\n operator: \"equals\"\n value: \"t3.nano\"\n", + "descriptiveTitle": None, + "constructiveTitle": None, + "pcPolicyId": None, + "additionalPcPolicyIds": None, + "frameworks": + [ + "CloudFormation", + "Terraform" + ], + "pcSeverity": None, + "sourceIncidentId": None + } + ]} + + detector_obj = CustomRegexDetector() + + assert len(detector_obj.denylist) == 0 + assert len(detector_obj.regex_to_metadata) == 0