Skip to content

Commit

Permalink
refactor get_weaknesses function in apache_httpd importer
Browse files Browse the repository at this point in the history
Signed-off-by: ambuj <[email protected]>
  • Loading branch information
ambuj-1211 committed Sep 15, 2024
1 parent 451f55b commit 229b70a
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 66 deletions.
163 changes: 101 additions & 62 deletions vulnerabilities/importers/apache_httpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,67 +161,106 @@ def fetch_links(url):


def get_weaknesses(cve_data):
"""
Extract CWE IDs from CVE data.
Args:
cve_data (dict): The CVE data in a dictionary format.
Returns:
List[int]: A list of unique CWE IDs.
>>> mock_cve_data = {
... "containers": {
... "cna": {
... "providerMetadata": {
... "orgId": "f0158376-9dc2-43b6-827c-5f631a4d8d09"
... },
... "title": "mod_macro buffer over-read",
... "problemTypes": [
... {
... "descriptions": [
... {
... "description": "CWE-125 Out-of-bounds Read",
... "lang": "en",
... "cweId": "CWE-125",
... "type": "CWE"
... }
... ]
... }
... ]
... }
... }
... }
>>> get_weaknesses(mock_cve_data)
[125]
"""
problem_types = cve_data.get("containers", {}).get("cna", {}).get("problemTypes", [])
descriptions = problem_types[0].get("descriptions", []) if len(problem_types) > 0 else []
cwe_string = descriptions[0].get("cweId", "") if len(descriptions) > 0 else ""
cwe_pattern = r"CWE-\d+"
description = descriptions[0].get("description", "") if len(descriptions) > 0 else ""
matches = re.findall(cwe_pattern, description)
# """
# Extract CWE IDs from CVE data.

# Args:
# cve_data (dict): The CVE data in a dictionary format.

# Returns:
# List[int]: A list of unique CWE IDs.

# Examples:
# >>> mock_cve_data1 = {
# ... "containers": {
# ... "cna": {
# ... "providerMetadata": {
# ... "orgId": "f0158376-9dc2-43b6-827c-5f631a4d8d09"
# ... },
# ... "title": "mod_macro buffer over-read",
# ... "problemTypes": [
# ... {
# ... "descriptions": [
# ... {
# ... "description": "CWE-125 Out-of-bounds Read",
# ... "lang": "en",
# ... "cweId": "CWE-125",
# ... "type": "CWE"
# ... }
# ... ]
# ... }
# ... ]
# ... }
# ... }
# ... }
# >>> mock_cve_data2 = {
# ... "data_type": "CVE",
# ... "data_format": "MITRE",
# ... "data_version": "4.0",
# ... "generator": {
# ... "engine": "Vulnogram 0.0.9"
# ... },
# ... "CVE_data_meta": {
# ... "ID": "CVE-2022-28614",
# ... "ASSIGNER": "[email protected]",
# ... "TITLE": "read beyond bounds via ap_rwrite() ",
# ... "STATE": "PUBLIC"
# ... },
# ... "problemtype": {
# ... "problemtype_data": [
# ... {
# ... "description": [
# ... {
# ... "lang": "eng",
# ... "value": "CWE-190 Integer Overflow or Wraparound"
# ... }
# ... ]
# ... },
# ... {
# ... "description": [
# ... {
# ... "lang": "eng",
# ... "value": "CWE-200 Exposure of Sensitive Information to an Unauthorized Actor"
# ... }
# ... ]
# ... }
# ... ]
# ... }
# ... }

# >>> get_weaknesses(mock_cve_data1)
# [125]

# >>> get_weaknesses(mock_cve_data2)
# [190, 200]
# """

alias = get_item(cve_data, "CVE_data_meta", "ID")
cwe_id = []
db = Database()
if alias:
problemtype_data = get_item(cve_data, "problemtype", "problemtype_data") or []
for problem in problemtype_data:
for desc in problem["description"]:
value = desc.get("value", "")
cwe_pattern = r"CWE-\d+"
cwe_id_string_list = re.findall(cwe_pattern, value)
for cwe_id_string in cwe_id_string_list:
cwe_id.append(get_cwe_id(cwe_id_string))

else:
problemTypes = cve_data.get("containers", {}).get("cna", {}).get("problemTypes", [])
descriptions = problemTypes[0].get("descriptions", []) if len(problemTypes) > 0 else []
for description in descriptions:
cwe_id_string = description.get("cweId", "")
cwe_id.append(get_cwe_id(cwe_id_string))

weaknesses = []
cwe_string_from_description = ""
if matches:
cwe_string_from_description = matches[0]
if cwe_string or cwe_string_from_description:
if cwe_string:
cwe_id = get_cwe_id(cwe_string)
try:
db.get(cwe_id)
weaknesses.append(cwe_id)
except Exception:
logger.error("Invalid CWE id")
elif cwe_string_from_description:
cwe_id = get_cwe_id(cwe_string_from_description)
try:
db.get(cwe_id)
weaknesses.append(cwe_id)
except Exception:
logger.error("Invalid CWE id")

seen = set()
unique_cwe = [x for x in weaknesses if not (x in seen or seen.add(x))]
return unique_cwe
for cwe in cwe_id:
try:
db.get(cwe)
weaknesses.append(cwe)
except Exception:
logger.error("Invalid CWE id")

return weaknesses
2 changes: 2 additions & 0 deletions vulnerabilities/tests/test_apache_httpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def test_to_advisory_CVE_2021_44224():
advisories = ApacheHTTPDImporter().to_advisory(data)
result = advisories.to_dict()
expected_file = os.path.join(TEST_DATA, f"CVE-2021-44224-apache-httpd-expected.json")
print(f"2021 {result}")
util_tests.check_results_against_json(result, expected_file)


Expand All @@ -119,6 +120,7 @@ def test_to_advisory_CVE_2022_28614():
advisories = ApacheHTTPDImporter().to_advisory(data)
result = advisories.to_dict()
expected_file = os.path.join(TEST_DATA, f"CVE-2022-28614-apache-httpd-expected.json")
print(f"2022 {result}")
util_tests.check_results_against_json(result, expected_file)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@
}
],
"date_published": null,
"weaknesses": [],
"weaknesses": [476],
"url": "https://httpd.apache.org/security/json/CVE-2021-44224.json"
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@
}
],
"date_published": null,
"weaknesses": [],
"weaknesses": [190, 200],
"url": "https://httpd.apache.org/security/json/CVE-2022-28614.json"
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
]
}
],
"weaknesses": []
"weaknesses": [476]
},
{
"vulnerability_id": null,
Expand Down Expand Up @@ -103,6 +103,6 @@
]
}
],
"weaknesses": []
"weaknesses": [476]
}
]

0 comments on commit 229b70a

Please sign in to comment.