-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
platform(secrets): Support custom detectors from the platform (#3926)
- Loading branch information
Showing
4 changed files
with
269 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
from typing import Set, Any, Generator, Pattern, Optional, Dict, Tuple, List, TYPE_CHECKING | ||
|
||
import yaml | ||
from detect_secrets.constants import VerifiedResult | ||
from detect_secrets.core.potential_secret import PotentialSecret | ||
from detect_secrets.plugins.base import RegexBasedDetector | ||
from detect_secrets.util.inject import call_function_with_arguments | ||
import re | ||
|
||
from checkov.common.bridgecrew.platform_integration import bc_integration | ||
|
||
if TYPE_CHECKING: | ||
from detect_secrets.util.code_snippet import CodeSnippet | ||
|
||
|
||
def load_detectors() -> list[dict[str, Any]]: | ||
detectors: List[dict[str, Any]] = [] | ||
try: | ||
customer_run_config_response = bc_integration.customer_run_config_response | ||
policies_list: List[dict[str, Any]] = [] | ||
if customer_run_config_response['secretsPolicies']: # type: ignore | ||
policies_list = customer_run_config_response['secretsPolicies'] # type: ignore | ||
except Exception as e: | ||
logging.error(f"Failed to get detectors from customer_run_config_response, error: {e}") | ||
return [] | ||
|
||
if policies_list: | ||
detectors = modify_secrets_policy_to_detectors(policies_list) | ||
if detectors: | ||
logging.info(f"Successfully loaded {len(detectors)} detectors from bc_integration") | ||
return detectors | ||
|
||
|
||
def modify_secrets_policy_to_detectors(policies_list: List[dict[str, Any]]) -> List[dict[str, Any]]: | ||
secrets_list = transforms_policies_to_detectors_list(policies_list) | ||
logging.info(f"(modify_secrets_policy_to_detectors) secrets_list = {secrets_list}") | ||
return secrets_list | ||
|
||
|
||
def transforms_policies_to_detectors_list(custom_secrets: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | ||
custom_detectors: List[Dict[str, Any]] = [] | ||
for secret_policy in custom_secrets: | ||
not_parsed = True | ||
code = secret_policy['code'] | ||
if code: | ||
code_dict = yaml.safe_load(secret_policy['code']) | ||
if 'definition' in code_dict: | ||
if 'value' in code_dict['definition']: | ||
not_parsed = False | ||
for regex in code_dict['definition']['value']: | ||
check_id = secret_policy['checkovCheckId'] if secret_policy['checkovCheckId'] else \ | ||
secret_policy['incidentId'] | ||
custom_detectors.append({'Name': secret_policy['title'], | ||
'Check_ID': check_id, | ||
'Regex': regex}) | ||
if not_parsed: | ||
logging.info(f"policy : {secret_policy} could not be parsed") | ||
return custom_detectors | ||
|
||
|
||
class CustomRegexDetector(RegexBasedDetector): | ||
secret_type = "Regex Detector" # noqa: CCE003 # nosec | ||
denylist: Set[Pattern[str]] = set() # noqa: CCE003 | ||
|
||
def __init__(self) -> None: | ||
self.regex_to_metadata: dict[str, dict[str, Any]] = dict() | ||
self.denylist = set() | ||
detectors = load_detectors() | ||
|
||
for detector in detectors: | ||
self.denylist.add(re.compile(r'{}'.format(detector["Regex"]))) | ||
self.regex_to_metadata[detector["Regex"]] = detector | ||
|
||
def analyze_line( | ||
self, | ||
filename: str, | ||
line: str, | ||
line_number: int = 0, | ||
context: Optional[CodeSnippet] = None, | ||
raw_context: Optional[CodeSnippet] = None, | ||
**kwargs: Any | ||
) -> Set[PotentialSecret]: | ||
"""This examines a line and finds all possible secret values in it.""" | ||
output: Set[PotentialSecret] = set() | ||
for match, regex in self.analyze_string(line, **kwargs): | ||
try: | ||
verified_result = call_function_with_arguments(self.verify, secret=match, context=context) | ||
is_verified = True if verified_result == VerifiedResult.VERIFIED_TRUE else False | ||
except Exception: | ||
is_verified = False | ||
|
||
ps = PotentialSecret(type=self.regex_to_metadata[regex.pattern]["Name"], filename=filename, secret=match, | ||
line_number=line_number, is_verified=is_verified) | ||
ps.check_id = self.regex_to_metadata[regex.pattern]["Check_ID"] # type:ignore[attr-defined] | ||
output.add(ps) | ||
|
||
return output | ||
|
||
def analyze_string(self, string: str, **kwargs: Optional[Dict[str, Any]]) -> Generator[Tuple[str, Pattern[str]], None, None]: # type:ignore[override] | ||
for regex in self.denylist: | ||
for match in regex.findall(string): | ||
if isinstance(match, tuple): | ||
for submatch in filter(bool, match): | ||
# It might make sense to paste break after yielding | ||
yield submatch, regex | ||
else: | ||
yield match, regex |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
FROM base:1 | ||
ENV aws_access_key=AKIA4NACSIJMDDNSEDTE | ||
USER bob | ||
ENV AWS_ACCESS_KEY_ID="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" | ||
ENV SEC_4="dsapi45202d12abdce73c004a9e0be24a21b2" | ||
ENV MY_SEC_1="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" | ||
ENV MY_SEC_2="glpat-KDNon6sfvHRKL8NtFfNR" | ||
ENV CIRCLE="2065ae463be4e434bb1d074a366d44e7a776d472" | ||
ENV SEC_3="eyJrIjoiNUwyZU7TMmRxQXNVcnR7UXB0ME4zYkhRaTk2STVhR0MiLCJuIjoidGVtcCIsImlkIjoxfQ==" | ||
ENV JIRA="5FP0NmFYz81U32XdjNb42762" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
import unittest | ||
from typing import Any, Dict, List | ||
|
||
from checkov.common.bridgecrew.platform_integration import bc_integration | ||
from checkov.secrets.plugins.custom_regex_detector import modify_secrets_policy_to_detectors, CustomRegexDetector | ||
|
||
|
||
class TestCustomRegexDetector(unittest.TestCase): | ||
|
||
def test_modify_secrets_policy_to_detectors(self) -> None: | ||
policies_list: List[Dict[str, Any]] = [ | ||
{ | ||
"incidentId": "lshindelman1_AWS_1666860510378", | ||
"category": "Secrets", | ||
"severity": "MEDIUM", | ||
"incidentType": "Violation", | ||
"title": "test", | ||
"guideline": "test", | ||
"laceworkViolationId": None, | ||
"prowlerCheckId": None, | ||
"checkovCheckId": None, | ||
"resourceTypes": [ | ||
"aws_instance" | ||
], | ||
"provider": "AWS", | ||
"remediationIds": [], | ||
"conditionQuery": { | ||
"or": [ | ||
{ | ||
"value": "t3.micro", | ||
"operator": "equals", | ||
"attribute": "instance_type", | ||
"cond_type": "attribute", | ||
"resource_types": [ | ||
"aws_instance" | ||
] | ||
}, | ||
{ | ||
"value": "t3.nano", | ||
"operator": "equals", | ||
"attribute": "instance_type", | ||
"cond_type": "attribute", | ||
"resource_types": [ | ||
"aws_instance" | ||
] | ||
} | ||
] | ||
}, | ||
"customerName": "lshindelman1", | ||
"isCustom": True, | ||
"createdBy": "[email protected]", | ||
"code": "---\nmetadata:\n name: \"test\" #give your custom policy a unique name \n guidelines: \"test\" #add text that explains the configuration the policy looks for, its implications, and how to fix it\n category: \"secrets\" #choose one: \"general\"/\"elasticsearch\"/\"iam\"/\"kubernetes\"/\"logging\"/\"monitoring\"/\"networking\"/\"public\"/\"secrets\"/\"serverless\"/\"storage\"\n severity: \"medium\" #choose one: \"critical\"/\"high\"/\"medium\"/\"low\"\nscope:\n provider: \"aws\" #choose one: \"aws\"/\"azure\"/\"gcp\"/\"kubernetes\"\ndefinition: #define the conditions the policy searches for.\n# The example below checks EC2s with instance_type t3.micro or t3.nano. for more examples please visit our docs - https://docs.bridgecrew.io/docs/yaml-format-for-custom-policies\n or:\n - cond_type: \"attribute\"\n resource_types:\n - \"aws_instance\"\n attribute: \"instance_type\"\n operator: \"equals\"\n value: \"t3.micro\"\n - cond_type: \"attribute\"\n resource_types:\n - \"aws_instance\"\n attribute: \"instance_type\"\n operator: \"equals\"\n value: \"t3.nano\"\n", | ||
"descriptiveTitle": None, | ||
"constructiveTitle": None, | ||
"pcPolicyId": None, | ||
"additionalPcPolicyIds": None, | ||
"frameworks": [ | ||
"CloudFormation", | ||
"Terraform" | ||
], | ||
"pcSeverity": None, | ||
"sourceIncidentId": None | ||
} | ||
] | ||
detector_obj = modify_secrets_policy_to_detectors(policies_list) | ||
detectors_result: List[Dict[str, Any]] = [] | ||
detector_obj.sort(key=lambda detector: detector['Check_ID']) | ||
detectors_result.sort(key=lambda detector: detector['Check_ID']) # type: ignore | ||
assert all( | ||
True for x in range(0, len(detector_obj)) if detector_obj[x]['Check_ID'] == detectors_result[x]['Check_ID']) | ||
assert len(detectors_result) == len(detector_obj) | ||
|
||
def test_test_custom_regex_detector(self) -> None: | ||
bc_integration.customer_run_config_response = {"secretsPolicies": [ | ||
{ | ||
"incidentId": "lshindelman1_AWS_1666860510378", | ||
"category": "Secrets", | ||
"severity": "MEDIUM", | ||
"incidentType": "Violation", | ||
"title": "test", | ||
"guideline": "test", | ||
"laceworkViolationId": None, | ||
"prowlerCheckId": None, | ||
"checkovCheckId": None, | ||
"resourceTypes": | ||
[ | ||
"aws_instance" | ||
], | ||
"provider": "AWS", | ||
"remediationIds": | ||
[], | ||
"conditionQuery": | ||
{ | ||
"or": | ||
[ | ||
{ | ||
"value": "t3.micro", | ||
"operator": "equals", | ||
"attribute": "instance_type", | ||
"cond_type": "attribute", | ||
"resource_types": | ||
[ | ||
"aws_instance" | ||
] | ||
}, | ||
{ | ||
"value": "t3.nano", | ||
"operator": "equals", | ||
"attribute": "instance_type", | ||
"cond_type": "attribute", | ||
"resource_types": | ||
[ | ||
"aws_instance" | ||
] | ||
} | ||
] | ||
}, | ||
"customerName": "lshindelman1", | ||
"isCustom": True, | ||
"createdBy": "[email protected]", | ||
"code": "---\nmetadata:\n name: \"test\" #give your custom policy a unique name \n guidelines: \"test\" #add text that explains the configuration the policy looks for, its implications, and how to fix it\n category: \"secrets\" #choose one: \"general\"/\"elasticsearch\"/\"iam\"/\"kubernetes\"/\"logging\"/\"monitoring\"/\"networking\"/\"public\"/\"secrets\"/\"serverless\"/\"storage\"\n severity: \"medium\" #choose one: \"critical\"/\"high\"/\"medium\"/\"low\"\nscope:\n provider: \"aws\" #choose one: \"aws\"/\"azure\"/\"gcp\"/\"kubernetes\"\ndefinition: #define the conditions the policy searches for.\n# The example below checks EC2s with instance_type t3.micro or t3.nano. for more examples please visit our docs - https://docs.bridgecrew.io/docs/yaml-format-for-custom-policies\n or:\n - cond_type: \"attribute\"\n resource_types:\n - \"aws_instance\"\n attribute: \"instance_type\"\n operator: \"equals\"\n value: \"t3.micro\"\n - cond_type: \"attribute\"\n resource_types:\n - \"aws_instance\"\n attribute: \"instance_type\"\n operator: \"equals\"\n value: \"t3.nano\"\n", | ||
"descriptiveTitle": None, | ||
"constructiveTitle": None, | ||
"pcPolicyId": None, | ||
"additionalPcPolicyIds": None, | ||
"frameworks": | ||
[ | ||
"CloudFormation", | ||
"Terraform" | ||
], | ||
"pcSeverity": None, | ||
"sourceIncidentId": None | ||
} | ||
]} | ||
|
||
detector_obj = CustomRegexDetector() | ||
|
||
assert len(detector_obj.denylist) == 0 | ||
assert len(detector_obj.regex_to_metadata) == 0 |