Skip to content

Commit

Permalink
fix(secrets): change entropy limit in Combinator plugin (bridgecrewio…
Browse files Browse the repository at this point in the history
…#3575)

* inital try - 1 test fails

* changed the example in the false positive test

* indicate the failed test

* adjusted limits for relevant files

* removed unnessary added plugins

* moved both limits to be together

* remove comment

* change combinator plugin to examine only non-source-code files

* undo the last change to combinator plugin

* changed the combinator to combine keywords only in Iac, in sc files scan only with high entropy

* remove commented code

* improved statement
  • Loading branch information
marynaKK authored Sep 29, 2022
1 parent 6dfbed0 commit c3bae90
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 78 deletions.
26 changes: 14 additions & 12 deletions checkov/secrets/plugins/entropy_keyword_combinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
from detect_secrets.util.code_snippet import CodeSnippet

MAX_LINE_LENGTH = 10000
ENTROPY_KEYWORD_COMBINATOR_LIMIT = 3
ENTROPY_KEYWORD_LIMIT = 4.5


class EntropyKeywordCombinator(BasePlugin):
secret_type = "" # nosec # noqa: CCE003 # a static attribute

def __init__(self, limit: float) -> None:
def __init__(self, limit: float = ENTROPY_KEYWORD_LIMIT) -> None:
iac_limit = ENTROPY_KEYWORD_COMBINATOR_LIMIT
self.high_entropy_scanners_iac = (Base64HighEntropyString(limit=iac_limit), HexHighEntropyString(limit=iac_limit))
self.high_entropy_scanners = (Base64HighEntropyString(limit=limit), HexHighEntropyString(limit=limit))
self.keyword_scanner = KeywordDetector()

Expand All @@ -34,18 +38,16 @@ def analyze_line(
one of the entropy scanners find a match (on a line which was already matched by keyword plugin) - it is returned.
for source code files run and merge the two plugins.
"""
entropy_matches = set()
is_iac = f".{filename.split('.')[-1]}" not in SOURCE_CODE_EXTENSION
if len(line) <= MAX_LINE_LENGTH:
keyword_matches = self.keyword_scanner.analyze_line(filename, line, line_number, **kwargs)
if f".{filename.split('.')[-1]}" in SOURCE_CODE_EXTENSION:
for entropy_scanner in self.high_entropy_scanners:
matches = entropy_scanner.analyze_line(filename, line, line_number, **kwargs)
if matches:
entropy_matches = matches
break
keyword_entropy = keyword_matches.union(entropy_matches)
return keyword_entropy
if keyword_matches:
if is_iac:
keyword_matches = self.keyword_scanner.analyze_line(filename, line, line_number, **kwargs)
if keyword_matches:
for entropy_scanner in self.high_entropy_scanners_iac:
matches = entropy_scanner.analyze_line(filename, line, line_number, **kwargs)
if matches:
return matches
else:
for entropy_scanner in self.high_entropy_scanners:
matches = entropy_scanner.analyze_line(filename, line, line_number, **kwargs)
if matches:
Expand Down
63 changes: 17 additions & 46 deletions checkov/secrets/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
}
CHECK_ID_TO_SECRET_TYPE = {v: k for k, v in SECRET_TYPE_TO_ID.items()}

ENTROPY_KEYWORD_LIMIT = 3
PROHIBITED_FILES = ['Pipfile.lock', 'yarn.lock', 'package-lock.json', 'requirements.txt']

MAX_FILE_SIZE = int(os.getenv('CHECKOV_MAX_FILE_SIZE', '5000000')) # 5 MB is default limit
Expand All @@ -69,62 +68,34 @@
class Runner(BaseRunner[None]):
check_type = CheckType.SECRETS # noqa: CCE003 # a static attribute

def run(
self,
def run(self,
root_folder: str | None,
external_checks_dir: list[str] | None = None,
files: list[str] | None = None,
runner_filter: RunnerFilter | None = None,
collect_skip_comments: bool = True
) -> Report:
) -> Report:
runner_filter = runner_filter or RunnerFilter()
current_dir = Path(__file__).parent
secrets = SecretsCollection()
plugins_used = \
[
{
'name': 'AWSKeyDetector'
},
{
'name': 'ArtifactoryDetector'
},
{
'name': 'AzureStorageKeyDetector'
},
{
'name': 'BasicAuthDetector'
},
{
'name': 'CloudantDetector'
},
{
'name': 'IbmCloudIamDetector'
},
{
'name': 'MailchimpDetector'
},
{
'name': 'PrivateKeyDetector'
},
{
'name': 'SlackDetector'
},
{
'name': 'SoftlayerDetector'
},
{
'name': 'SquareOAuthDetector'
},
{
'name': 'StripeDetector'
},
{
'name': 'TwilioKeyDetector'
},
{'name': 'AWSKeyDetector'},
{'name': 'ArtifactoryDetector'},
{'name': 'AzureStorageKeyDetector'},
{'name': 'BasicAuthDetector'},
{'name': 'CloudantDetector'},
{'name': 'IbmCloudIamDetector'},
{'name': 'MailchimpDetector'},
{'name': 'PrivateKeyDetector'},
{'name': 'SlackDetector'},
{'name': 'SoftlayerDetector'},
{'name': 'SquareOAuthDetector'},
{'name': 'StripeDetector'},
{'name': 'TwilioKeyDetector'},
{
'name': 'EntropyKeywordCombinator',
'path': f'file://{current_dir}/plugins/entropy_keyword_combinator.py',
'limit': ENTROPY_KEYWORD_LIMIT
'path': f'file://{current_dir}/plugins/entropy_keyword_combinator.py'
}
]
custom_plugins = os.getenv("CHECKOV_CUSTOM_DETECTOR_PLUGINS_PATH")
Expand Down Expand Up @@ -284,4 +255,4 @@ def search_for_suppression(
"result": CheckResult.SKIPPED,
"suppress_comment": skip_search.group(3)[1:] if skip_search.group(3) else "No comment provided"
}
return None
return None
22 changes: 22 additions & 0 deletions tests/secrets/resources/cfn/secret-no-false-positive.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
no False Positive - where it's not an actual secret
"""

CleanBucketFunction:
Type: "AWS::Lambda::Function"
DependsOn: CleanupRole
Properties:
Handler: index.clearS3Bucket
Role:
Fn::GetAtt: CleanupRole.Arn
Runtime: nodejs12.x
Timeout: 25
Code:
ZipFile: |
no False Positive - where it's not an actual secret
check_metadata_values = ('bafadssda$#%2', 'bdfsver#$@%')
CHECKOV_METADATA_RESULT = 'checkov_results5243gvr'
check1 = {'blabla': 'blabla1'}
check2 = {'blabla': 'blabla2'}
check1['some_key_1235#$@'] = check2.get('some_value_1235')
not_a_secr_k = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
8 changes: 8 additions & 0 deletions tests/secrets/resources/file_type/test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# no False Positive - where it's not an actual secret
check_metadata_values = ('bafadssda$#%2', 'bdfsver#$@%')
CHECKOV_METADATA_RESULT = 'checkov_results5243gvr'
check1 = {'blabla': 'blabla1'}
check2 = {'blabla': 'blabla2'}
check1['some_key_1235#$@'] = check2.get('some_value_1235')


access_key = "AKIAIOSFODNN7EXAMPLE"
secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
if __name__ == '__main__':
Expand Down
23 changes: 16 additions & 7 deletions tests/secrets/test_plugin.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import os
import unittest

from checkov.secrets.plugins.entropy_keyword_combinator import EntropyKeywordCombinator
from checkov.secrets.runner import ENTROPY_KEYWORD_LIMIT


class TestCombinatorPlugin(unittest.TestCase):
def setUp(self) -> None:
self.plugin = EntropyKeywordCombinator(ENTROPY_KEYWORD_LIMIT)
self.plugin = EntropyKeywordCombinator()

def test_positive_value(self):
result = self.plugin.analyze_line("mock.tf", 'api_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMAAAKEY"', 5)
Expand All @@ -30,9 +30,18 @@ def test_popular_kubernetes_manifest_password(self):
self.assertEqual("Base64 High Entropy String", secret.type)
self.assertEqual("bfd3617727eab0e800e62a776c76381defbc4145", secret.secret_hash)

def test_source_code_file_value(self):
def test_no_false_positive_py(self):
# combinator plugin should ignore source code files
result = self.plugin.analyze_line("main.py", 'api_key = "7T)G#dl5}c=T>kf$G3Bon^!R?kzF00"', 1)
self.assertEqual(1, len(result))
secret = result.pop()
self.assertEqual("Secret Keyword", secret.type)
self.assertEqual("93beaa774e56483f19e4fe916ce87e62d4b3ea85", secret.secret_hash)
self.assertEqual(0, len(result))

def test_no_false_positive_yml_1(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
file_name = "secret-no-false-positive.yml"
valid_file_path = current_dir + f"/resources/cfn/{file_name}"
with open(file=valid_file_path) as f:
for i, line in enumerate(f.readlines()):
result = self.plugin.analyze_line(file_name, line, i)
self.assertEqual(0, len(result))


44 changes: 31 additions & 13 deletions tests/secrets/test_runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import unittest

import os
from pathlib import Path

from checkov.common.bridgecrew.check_type import CheckType
from checkov.common.bridgecrew.integration_features.features.policy_metadata_integration import integration as metadata_integration
Expand Down Expand Up @@ -48,7 +47,7 @@ def test_runner_passing_check(self):
valid_dir_path = current_dir + "/resources/terraform"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets'))
runner_filter=RunnerFilter(framework=['secrets']))
self.assertEqual(len(report.passed_checks), 0)
self.assertEqual(report.parsing_errors, [])
self.assertEqual(report.failed_checks, [])
Expand All @@ -60,20 +59,21 @@ def test_runner_tf_failing_check(self):
valid_dir_path = current_dir + "/resources/terraform_failed"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets'))
runner_filter=RunnerFilter(framework=['secrets']))
self.assertEqual(2, len(report.failed_checks))
self.assertEqual(report.parsing_errors, [])
self.assertEqual(report.passed_checks, [])
self.assertEqual(report.skipped_checks, [])
report.print_console()

def test_runner_tf_skip_check(self):
valid_dir_path = Path(__file__).parent / "resources/terraform_skip"
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_dir_path = current_dir + "/resources/terraform_skip"

report = Runner().run(
root_folder=valid_dir_path,
external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets')
runner_filter=RunnerFilter(framework=['secrets'])
)

self.assertEqual(len(report.skipped_checks), 1)
Expand All @@ -88,7 +88,7 @@ def test_runner_specific_check(self):
valid_dir_path = current_dir + "/resources/cfn"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', checks=['CKV_SECRET_2']))
runner_filter=RunnerFilter(framework=['secrets'], checks=['CKV_SECRET_2']))
self.assertEqual(len(report.skipped_checks), 0)
self.assertEqual(len(report.failed_checks), 1)
self.assertEqual(report.parsing_errors, [])
Expand All @@ -99,7 +99,7 @@ def test_runner_wildcard_check(self):
valid_dir_path = current_dir + "/resources/cfn"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', checks=['CKV_SECRET*']))
runner_filter=RunnerFilter(framework=['secrets'], checks=['CKV_SECRET*']))
self.assertEqual(len(report.skipped_checks), 0)
self.assertEqual(len(report.failed_checks), 2)
self.assertEqual(report.parsing_errors, [])
Expand All @@ -110,7 +110,7 @@ def test_runner_skip_check(self):
valid_dir_path = current_dir + "/resources/cfn"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', skip_checks=['CKV_SECRET_2']))
runner_filter=RunnerFilter(framework=['secrets'], skip_checks=['CKV_SECRET_2']))
self.assertEqual(len(report.skipped_checks), 0)
self.assertEqual(len(report.failed_checks), 1)
self.assertEqual(report.failed_checks[0].check_id, 'CKV_SECRET_6')
Expand All @@ -129,7 +129,7 @@ def test_record_has_severity(self):

runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', checks=['CKV_SECRET_2']))
runner_filter=RunnerFilter(framework=['secrets'], checks=['CKV_SECRET_2']))
self.assertEqual(report.failed_checks[0].severity, Severities[BcSeverities.LOW])

def test_runner_check_severity(self):
Expand All @@ -147,7 +147,7 @@ def test_runner_check_severity(self):

runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', checks=['MEDIUM']))
runner_filter=RunnerFilter(framework=['secrets'], checks=['MEDIUM']))
self.assertEqual(len(report.skipped_checks), 0)
self.assertEqual(len(report.failed_checks), 1)
self.assertEqual(report.failed_checks[0].check_id, 'CKV_SECRET_6')
Expand All @@ -169,7 +169,7 @@ def test_runner_skip_check_severity(self):

runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', skip_checks=['MEDIUM']))
runner_filter=RunnerFilter(framework=['secrets'], skip_checks=['MEDIUM']))
self.assertEqual(len(report.skipped_checks), 0)
self.assertEqual(len(report.failed_checks), 1)
self.assertEqual(report.failed_checks[0].check_id, 'CKV_SECRET_6')
Expand All @@ -181,7 +181,7 @@ def test_runner_skip_check_wildcard(self):
valid_dir_path = current_dir + "/resources/cfn"
runner = Runner()
report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets', skip_checks=['CKV_SECRET*']))
runner_filter=RunnerFilter(framework=['secrets'], skip_checks=['CKV_SECRET*']))
self.assertEqual(len(report.skipped_checks), 0)
self.assertEqual(len(report.failed_checks), 0)
self.assertEqual(report.parsing_errors, [])
Expand Down Expand Up @@ -211,7 +211,7 @@ def test_runner_bc_ids(self):
}

report = runner.run(root_folder=valid_dir_path, external_checks_dir=None,
runner_filter=RunnerFilter(framework='secrets'))
runner_filter=RunnerFilter(framework=['secrets']))
for fc in report.failed_checks:
if fc.check_id == 'CKV_SECRET_2':
self.assertEqual(fc.bc_check_id, 'BC_GIT_2')
Expand Down Expand Up @@ -286,6 +286,7 @@ def test_runner_requested_file_only_dockerfile(self):
enable_secret_scan_all_files=True))
self.assertEqual(len(report.failed_checks), 4)


def test_runner_no_requested_file(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_dir_path = current_dir + "/resources"
Expand All @@ -294,6 +295,23 @@ def test_runner_no_requested_file(self):
runner_filter=RunnerFilter(framework=['secrets']))
self.assertEqual(len(report.failed_checks), 9)

def test_true_positive_py(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_file_path = current_dir + "/resources/file_type/test.py"
runner = Runner()
report = runner.run(root_folder=None, files=[valid_file_path], external_checks_dir=None,
runner_filter=RunnerFilter(framework=['secrets'], enable_secret_scan_all_files=True))
self.assertEqual(len(report.failed_checks), 2)

def test_no_false_positive_yml_2(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
valid_dir_path = current_dir + "/resources/cfn"
valid_file_path = valid_dir_path + "/secret-no-false-positive.yml"
runner = Runner()
report = runner.run(root_folder=None, files=[valid_file_path], external_checks_dir=None,
runner_filter=RunnerFilter(framework=['secrets'],
enable_secret_scan_all_files=True))
self.assertEqual(len(report.failed_checks), 0)

if __name__ == '__main__':
unittest.main()

0 comments on commit c3bae90

Please sign in to comment.