Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔨 rework kubescape parser #11229

Merged
merged 13 commits into from
Nov 16, 2024
2 changes: 1 addition & 1 deletion dojo/settings/.settings.dist.py.sha256sum
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7a71516d9e6d3fedd26424517ea2c228ad74b6fee8aaa7d8752b8ea4f228aef3
f6cfd7d4048275a8c0af62d3be5957527c6d0d06d5b0a7d89d8c3ec11faffa2d
1 change: 1 addition & 0 deletions dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,7 @@ def saml2_attrib_map_format(dict):
"ELSA": "https://linux.oracle.com/errata/&&.html", # e.g. https://linux.oracle.com/errata/ELSA-2024-12714.html
"ELBA": "https://linux.oracle.com/errata/&&.html", # e.g. https://linux.oracle.com/errata/ELBA-2024-7457.html
"RXSA": "https://errata.rockylinux.org/", # e.g. https://errata.rockylinux.org/RXSA-2024:4928
"C-": "https://hub.armosec.io/docs/", # e.g. https://hub.armosec.io/docs/c-0085
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems somewhat unlikely that a C- prefix will be completely unique to ARMO... We may need to think in the future about how to better categorize different vulnerability IDs. It's getting increasingly convoluted to handle all these different patterns in this way with display tags.

No action needed on this PR, mostly leaving this as a note for myself and to get others' feedback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right. This C- prefix is used to not mix C with CVE.

"AVD": "https://avd.aquasec.com/misconfig/", # e.g. https://avd.aquasec.com/misconfig/avd-ksv-01010
"KHV": "https://avd.aquasec.com/misconfig/kubernetes/", # e.g. https://avd.aquasec.com/misconfig/kubernetes/khv045
"CAPEC": "https://capec.mitre.org/data/definitions/&&.html", # e.g. https://capec.mitre.org/data/definitions/157.html
Expand Down
2 changes: 1 addition & 1 deletion dojo/templatetags/display_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ def vulnerability_url(vulnerability_id):

for key in settings.VULNERABILITY_URLS:
if vulnerability_id.upper().startswith(key):
if key in ["AVD", "KHV"]:
if key in ["AVD", "KHV", "C-"]:
return settings.VULNERABILITY_URLS[key] + str(vulnerability_id.lower())
if "&&" in settings.VULNERABILITY_URLS[key]:
# Process specific keys specially if need
Expand Down
115 changes: 56 additions & 59 deletions dojo/tools/kubescape/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,65 +59,62 @@ def get_findings(self, filename, test):
controls = results[0].get("controls", [])

for control in controls:
# This condition is true if the result doesn't contain the status for each control (old format)
retrocompatibility_condition = "status" not in control or "status" not in control["status"]
if retrocompatibility_condition or control["status"]["status"] == "failed":
control_name = control["name"]
if resource_type and resource_name and control_name:
title = f"{control_name} - {resource_type} {resource_name}"
else:
title = f"{control_name} - {resourceid}"
controlID = control["controlID"]

# Find control details
controlSummary = self.find_control_summary_by_id(data, controlID)
if controlSummary is None:
severity = "Info"
mitigation = ""
else:
severity = self.severity_mapper(controlSummary.get("scoreFactor", 0))
# Define mitigation if available
if "mitigation" in controlSummary:
mitigation = controlSummary["mitigation"]
for rule in control["rules"]:
# This condition is true if the result doesn't contain the status for each control (old format)
retrocompatibility_condition = "status" not in control or "status" not in control["status"]
if retrocompatibility_condition or control["status"]["status"] == "failed":
control_name = control["name"]
if resource_type and resource_name and control_name:
title = f"{control_name} - {resource_type} {resource_name}"
else:
mitigation = ""

armoLink = f"https://hub.armosec.io/docs/{controlID.lower()}"
description = "**Summary:** " + f"The ressource '{resourceid}' has failed the control '{control_name}'." + "\n"
if controlSummary is not None and "description" in controlSummary:
description += "**Description:** " + controlSummary["description"] + "\n"

# Define category if available
if controlSummary is not None and "category" in controlSummary and "subCategory" in controlSummary["category"]:
category_name = controlSummary["category"]["name"]
category_subname = controlSummary["category"]["subCategory"]["name"]
category = f"{category_name} > {category_subname}"
description += "**Category:** " + category + "\n"
elif controlSummary is not None and "category" in controlSummary and "name" in controlSummary["category"]:
category = controlSummary["category"]["name"]
description += "**Category:** " + category + "\n"

description += "View control details here: " + self.__hyperlink(armoLink)
title = f"{control_name} - {resourceid}"
controlID = control["controlID"]

steps_to_reproduce = "The following rules have failed :" + "\n"
steps_to_reproduce += "\t**Rules:** " + str(json.dumps(control["rules"], indent=4)) + "\n"

steps_to_reproduce += "Resource object may contain evidence:" + "\n"
steps_to_reproduce += "\t**Resource object:** " + str(json.dumps(resource["object"], indent=4))

references = armoLink

find = Finding(
title=textwrap.shorten(title, 150),
test=test,
description=description,
mitigation=mitigation,
steps_to_reproduce=steps_to_reproduce,
references=references,
severity=severity,
component_name=resourceid,
static_finding=True,
dynamic_finding=False,
)
findings.append(find)
# Find control details
controlSummary = self.find_control_summary_by_id(data, controlID)
if controlSummary is None:
severity = "Info"
mitigation = ""
else:
severity = self.severity_mapper(controlSummary.get("scoreFactor", 0))
# Define mitigation if available
if "mitigation" in controlSummary:
mitigation = controlSummary["mitigation"]
else:
mitigation = ""

description = "**Summary:** " + f"The ressource '{resourceid}' has failed the control '{control_name}'." + "\n"
if controlSummary is not None and "description" in controlSummary:
description += "**Description:** " + controlSummary["description"] + "\n"

# Define category if available
if controlSummary is not None and "category" in controlSummary and "subCategory" in controlSummary["category"]:
category_name = controlSummary["category"]["name"]
category_subname = controlSummary["category"]["subCategory"]["name"]
category = f"{category_name} > {category_subname}"
description += "**Category:** " + category + "\n"
elif controlSummary is not None and "category" in controlSummary and "name" in controlSummary["category"]:
category = controlSummary["category"]["name"]
description += "**Category:** " + category + "\n"

steps_to_reproduce = "The following rules have failed :" + "\n"
steps_to_reproduce += "\t**Rules:** " + str(json.dumps(control["rules"], indent=4)) + "\n"
steps_to_reproduce += "Resource object may contain evidence:" + "\n"
steps_to_reproduce += "\t**Resource object:** " + str(json.dumps(resource["object"], indent=4))
if rule["status"] != "passed":
manuel-sommer marked this conversation as resolved.
Show resolved Hide resolved
find = Finding(
title=textwrap.shorten(title, 150),
test=test,
description=description,
mitigation=mitigation,
steps_to_reproduce=steps_to_reproduce,
severity=severity,
component_name=resourceid,
static_finding=True,
dynamic_finding=False,
manuel-sommer marked this conversation as resolved.
Show resolved Hide resolved
)
findings.append(find)
if controlID is not None:
find.unsaved_vulnerability_ids = []
find.unsaved_vulnerability_ids.append(controlID)
manuel-sommer marked this conversation as resolved.
Show resolved Hide resolved
return findings
2 changes: 1 addition & 1 deletion unittests/tools/test_kubescape_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def test_parse_file_has_many_findings(self):
with open(get_unit_tests_path() + "/scans/kubescape/many_findings.json", encoding="utf-8") as testfile:
parser = KubescapeParser()
findings = parser.get_findings(testfile, Test())
self.assertEqual(710, len(findings))
self.assertEqual(349, len(findings))

def test_parse_file_has_many_results(self):
with open(get_unit_tests_path() + "/scans/kubescape/results.json", encoding="utf-8") as testfile:
Expand Down