Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(secrets): change entropy limit in Combinator plugin #3575

Merged
merged 12 commits into from
Sep 29, 2022
26 changes: 14 additions & 12 deletions checkov/secrets/plugins/entropy_keyword_combinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
from detect_secrets.util.code_snippet import CodeSnippet

MAX_LINE_LENGTH = 10000
ENTROPY_KEYWORD_COMBINATOR_LIMIT = 3
ENTROPY_KEYWORD_LIMIT = 4.5


class EntropyKeywordCombinator(BasePlugin):
secret_type = "" # nosec # noqa: CCE003 # a static attribute

def __init__(self, limit: float) -> None:
def __init__(self, limit: float = ENTROPY_KEYWORD_LIMIT) -> None:
iac_limit = ENTROPY_KEYWORD_COMBINATOR_LIMIT
self.high_entropy_scanners_iac = (Base64HighEntropyString(limit=iac_limit), HexHighEntropyString(limit=iac_limit))
self.high_entropy_scanners = (Base64HighEntropyString(limit=limit), HexHighEntropyString(limit=limit))
self.keyword_scanner = KeywordDetector()

Expand All @@ -34,18 +38,16 @@ def analyze_line(
one of the entropy scanners find a match (on a line which was already matched by keyword plugin) - it is returned.
for source code files run and merge the two plugins.
"""
entropy_matches = set()
is_iac = True if f".{filename.split('.')[-1]}" not in SOURCE_CODE_EXTENSION else False
nimrodkor marked this conversation as resolved.
Show resolved Hide resolved
if len(line) <= MAX_LINE_LENGTH:
keyword_matches = self.keyword_scanner.analyze_line(filename, line, line_number, **kwargs)
if f".{filename.split('.')[-1]}" in SOURCE_CODE_EXTENSION:
for entropy_scanner in self.high_entropy_scanners:
matches = entropy_scanner.analyze_line(filename, line, line_number, **kwargs)
if matches:
entropy_matches = matches
break
keyword_entropy = keyword_matches.union(entropy_matches)
return keyword_entropy
if keyword_matches:
if is_iac:
keyword_matches = self.keyword_scanner.analyze_line(filename, line, line_number, **kwargs)
if keyword_matches:
for entropy_scanner in self.high_entropy_scanners_iac:
matches = entropy_scanner.analyze_line(filename, line, line_number, **kwargs)
if matches:
return matches
else:
for entropy_scanner in self.high_entropy_scanners:
matches = entropy_scanner.analyze_line(filename, line, line_number, **kwargs)
if matches:
Expand Down
63 changes: 17 additions & 46 deletions checkov/secrets/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
}
CHECK_ID_TO_SECRET_TYPE = {v: k for k, v in SECRET_TYPE_TO_ID.items()}

ENTROPY_KEYWORD_LIMIT = 3
PROHIBITED_FILES = ['Pipfile.lock', 'yarn.lock', 'package-lock.json', 'requirements.txt']

MAX_FILE_SIZE = int(os.getenv('CHECKOV_MAX_FILE_SIZE', '5000000')) # 5 MB is default limit
Expand All @@ -69,62 +68,34 @@
class Runner(BaseRunner[None]):
check_type = CheckType.SECRETS # noqa: CCE003 # a static attribute

def run(
self,
def run(self,
root_folder: str | None,
external_checks_dir: list[str] | None = None,
files: list[str] | None = None,
runner_filter: RunnerFilter | None = None,
collect_skip_comments: bool = True
) -> Report:
) -> Report:
runner_filter = runner_filter or RunnerFilter()
current_dir = Path(__file__).parent
secrets = SecretsCollection()
plugins_used = \
[
{
'name': 'AWSKeyDetector'
},
{
'name': 'ArtifactoryDetector'
},
{
'name': 'AzureStorageKeyDetector'
},
{
'name': 'BasicAuthDetector'
},
{
'name': 'CloudantDetector'
},
{
'name': 'IbmCloudIamDetector'
},
{
'name': 'MailchimpDetector'
},
{
'name': 'PrivateKeyDetector'
},
{
'name': 'SlackDetector'
},
{
'name': 'SoftlayerDetector'
},
{
'name': 'SquareOAuthDetector'
},
{
'name': 'StripeDetector'
},
{
'name': 'TwilioKeyDetector'
},
{'name': 'AWSKeyDetector'},
{'name': 'ArtifactoryDetector'},
{'name': 'AzureStorageKeyDetector'},
{'name': 'BasicAuthDetector'},
{'name': 'CloudantDetector'},
{'name': 'IbmCloudIamDetector'},
{'name': 'MailchimpDetector'},
{'name': 'PrivateKeyDetector'},
{'name': 'SlackDetector'},
{'name': 'SoftlayerDetector'},
{'name': 'SquareOAuthDetector'},
{'name': 'StripeDetector'},
{'name': 'TwilioKeyDetector'},
{
'name': 'EntropyKeywordCombinator',
'path': f'file://{current_dir}/plugins/entropy_keyword_combinator.py',
'limit': ENTROPY_KEYWORD_LIMIT
'path': f'file://{current_dir}/plugins/entropy_keyword_combinator.py'
}
]
custom_plugins = os.getenv("CHECKOV_CUSTOM_DETECTOR_PLUGINS_PATH")
Expand Down Expand Up @@ -284,4 +255,4 @@ def search_for_suppression(
"result": CheckResult.SKIPPED,
"suppress_comment": skip_search.group(3)[1:] if skip_search.group(3) else "No comment provided"
}
return None
return None
22 changes: 22 additions & 0 deletions tests/secrets/resources/cfn/secret-no-false-positive.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
no False Positive - where it's not an actual secret
"""

CleanBucketFunction:
Type: "AWS::Lambda::Function"
DependsOn: CleanupRole
Properties:
Handler: index.clearS3Bucket
Role:
Fn::GetAtt: CleanupRole.Arn
Runtime: nodejs12.x
Timeout: 25
Code:
ZipFile: |
no False Positive - where it's not an actual secret
check_metadata_values = ('bafadssda$#%2', 'bdfsver#$@%')
CHECKOV_METADATA_RESULT = 'checkov_results5243gvr'
check1 = {'blabla': 'blabla1'}
check2 = {'blabla': 'blabla2'}
check1['some_key_1235#$@'] = check2.get('some_value_1235')
not_a_secr_k = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
8 changes: 8 additions & 0 deletions tests/secrets/resources/file_type/test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# no False Positive - where it's not an actual secret
check_metadata_values = ('bafadssda$#%2', 'bdfsver#$@%')
CHECKOV_METADATA_RESULT = 'checkov_results5243gvr'
check1 = {'blabla': 'blabla1'}
check2 = {'blabla': 'blabla2'}
check1['some_key_1235#$@'] = check2.get('some_value_1235')


access_key = "AKIAIOSFODNN7EXAMPLE"
secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
if __name__ == '__main__':
Expand Down
23 changes: 16 additions & 7 deletions tests/secrets/test_plugin.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import os
import unittest

from checkov.secrets.plugins.entropy_keyword_combinator import EntropyKeywordCombinator
from checkov.secrets.runner import ENTROPY_KEYWORD_LIMIT


class TestCombinatorPlugin(unittest.TestCase):
def setUp(self) -> None:
self.plugin = EntropyKeywordCombinator(ENTROPY_KEYWORD_LIMIT)
self.plugin = EntropyKeywordCombinator()

def test_positive_value(self):
result = self.plugin.analyze_line("mock.tf", 'api_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMAAAKEY"', 5)
Expand All @@ -30,9 +30,18 @@ def test_popular_kubernetes_manifest_password(self):
self.assertEqual("Base64 High Entropy String", secret.type)
self.assertEqual("bfd3617727eab0e800e62a776c76381defbc4145", secret.secret_hash)

def test_source_code_file_value(self):
def test_no_false_positive_py(self):
# combinator plugin should ignore source code files
result = self.plugin.analyze_line("main.py", 'api_key = "7T)G#dl5}c=T>kf$G3Bon^!R?kzF00"', 1)
self.assertEqual(1, len(result))
secret = result.pop()
self.assertEqual("Secret Keyword", secret.type)
self.assertEqual("93beaa774e56483f19e4fe916ce87e62d4b3ea85", secret.secret_hash)
self.assertEqual(0, len(result))

def test_no_false_positive_yml_1(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
file_name = "secret-no-false-positive.yml"
valid_file_path = current_dir + f"/resources/cfn/{file_name}"
with open(file=valid_file_path) as f:
for i, line in enumerate(f.readlines()):
result = self.plugin.analyze_line(file_name, line, i)
self.assertEqual(0, len(result))


44 changes: 31 additions & 13 deletions tests/secrets/test_runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import unittest

import os
from pathlib import Path

from checkov.common.bridgecrew.check_type import CheckType
from checkov.common.bridgecrew.integration_features.features.policy_metadata_integration import integration as metadata_integration
Expand Down Expand Up @@ -48,7 +47,7 @@ def test_runner_passing_check(self):
valid_dir_path = current_dir + "/resources/terraform"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets'))
runner_filter=RunnerFilter(framework=['secrets']))
self.assertEqual(len(report.passed_checks), 0)
self.assertEqual(report.parsing_errors, [])
self.assertEqual(report.failed_checks, [])
Expand All @@ -60,20 +59,21 @@ def test_runner_tf_failing_check(self):
valid_dir_path = current_dir + "/resources/terraform_failed"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets'))
runner_filter=RunnerFilter(framework=['secrets']))
self.assertEqual(2, len(report.failed_checks))
self.assertEqual(report.parsing_errors, [])
self.assertEqual(report.passed_checks, [])
self.assertEqual(report.skipped_checks, [])
report.print_console()

def test_runner_tf_skip_check(self):
valid_dir_path = Path(__file__).parent / "resources/terraform_skip"
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_dir_path = current_dir + "/resources/terraform_skip"

report = Runner().run(
root_folder=valid_dir_path,
external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets')
runner_filter=RunnerFilter(framework=['secrets'])
)

self.assertEqual(len(report.skipped_checks), 1)
Expand All @@ -88,7 +88,7 @@ def test_runner_specific_check(self):
valid_dir_path = current_dir + "/resources/cfn"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', checks=['CKV_SECRET_2']))
runner_filter=RunnerFilter(framework=['secrets'], checks=['CKV_SECRET_2']))
self.assertEqual(len(report.skipped_checks), 0)
self.assertEqual(len(report.failed_checks), 1)
self.assertEqual(report.parsing_errors, [])
Expand All @@ -99,7 +99,7 @@ def test_runner_wildcard_check(self):
valid_dir_path = current_dir + "/resources/cfn"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', checks=['CKV_SECRET*']))
runner_filter=RunnerFilter(framework=['secrets'], checks=['CKV_SECRET*']))
self.assertEqual(len(report.skipped_checks), 0)
self.assertEqual(len(report.failed_checks), 2)
self.assertEqual(report.parsing_errors, [])
Expand All @@ -110,7 +110,7 @@ def test_runner_skip_check(self):
valid_dir_path = current_dir + "/resources/cfn"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', skip_checks=['CKV_SECRET_2']))
runner_filter=RunnerFilter(framework=['secrets'], skip_checks=['CKV_SECRET_2']))
self.assertEqual(len(report.skipped_checks), 0)
self.assertEqual(len(report.failed_checks), 1)
self.assertEqual(report.failed_checks[0].check_id, 'CKV_SECRET_6')
Expand All @@ -129,7 +129,7 @@ def test_record_has_severity(self):

runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', checks=['CKV_SECRET_2']))
runner_filter=RunnerFilter(framework=['secrets'], checks=['CKV_SECRET_2']))
self.assertEqual(report.failed_checks[0].severity, Severities[BcSeverities.LOW])

def test_runner_check_severity(self):
Expand All @@ -147,7 +147,7 @@ def test_runner_check_severity(self):

runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', checks=['MEDIUM']))
runner_filter=RunnerFilter(framework=['secrets'], checks=['MEDIUM']))
self.assertEqual(len(report.skipped_checks), 0)
self.assertEqual(len(report.failed_checks), 1)
self.assertEqual(report.failed_checks[0].check_id, 'CKV_SECRET_6')
Expand All @@ -169,7 +169,7 @@ def test_runner_skip_check_severity(self):

runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', skip_checks=['MEDIUM']))
runner_filter=RunnerFilter(framework=['secrets'], skip_checks=['MEDIUM']))
self.assertEqual(len(report.skipped_checks), 0)
self.assertEqual(len(report.failed_checks), 1)
self.assertEqual(report.failed_checks[0].check_id, 'CKV_SECRET_6')
Expand All @@ -181,7 +181,7 @@ def test_runner_skip_check_wildcard(self):
valid_dir_path = current_dir + "/resources/cfn"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', skip_checks=['CKV_SECRET*']))
runner_filter=RunnerFilter(framework=['secrets'], skip_checks=['CKV_SECRET*']))
self.assertEqual(len(report.skipped_checks), 0)
self.assertEqual(len(report.failed_checks), 0)
self.assertEqual(report.parsing_errors, [])
Expand Down Expand Up @@ -211,7 +211,7 @@ def test_runner_bc_ids(self):
}

report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets'))
runner_filter=RunnerFilter(framework=['secrets']))
for fc in report.failed_checks:
if fc.check_id == 'CKV_SECRET_2':
self.assertEqual(fc.bc_check_id, 'BC_GIT_2')
Expand Down Expand Up @@ -286,6 +286,7 @@ def test_runner_requested_file_only_dockerfile(self):
enable_secret_scan_all_files=True))
self.assertEqual(len(report.failed_checks), 4)


def test_runner_no_requested_file(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_dir_path = current_dir + "/resources"
Expand All @@ -294,6 +295,23 @@ def test_runner_no_requested_file(self):
runner_filter=RunnerFilter(framework=['secrets']))
self.assertEqual(len(report.failed_checks), 9)

def test_true_positive_py(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_file_path = current_dir + "/resources/file_type/test.py"
runner = Runner()
report = runner.run(root_folder=None, files=[valid_file_path], external_checks_dir=None,
runner_filter=RunnerFilter(framework=['secrets'], enable_secret_scan_all_files=True))
self.assertEqual(len(report.failed_checks), 2)

def test_no_false_positive_yml_2(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_dir_path = current_dir + "/resources/cfn"
valid_file_path = valid_dir_path + "/secret-no-false-positive.yml"
runner = Runner()
report = runner.run(root_folder=None, files=[valid_file_path], external_checks_dir=None,
runner_filter=RunnerFilter(framework=['secrets'],
enable_secret_scan_all_files=True))
self.assertEqual(len(report.failed_checks), 0)

if __name__ == '__main__':
unittest.main()