From 06ff84f4908e4138fad9f4657a009d616e246edb Mon Sep 17 00:00:00 2001 From: Dmitrii Mariushkin Date: Thu, 28 Nov 2024 05:10:52 +0300 Subject: [PATCH] Fix sarif parser location files processing (#11265) * Fix sarif parser locations files processing * Fix tests * linter fixes * fix snippet for each file hit * fix snippet --- dojo/tools/sarif/parser.py | 268 ++++++++++++++------------- unittests/tools/test_sarif_parser.py | 4 +- 2 files changed, 145 insertions(+), 127 deletions(-) diff --git a/dojo/tools/sarif/parser.py b/dojo/tools/sarif/parser.py index ae9736e9a0..e2bcae845f 100644 --- a/dojo/tools/sarif/parser.py +++ b/dojo/tools/sarif/parser.py @@ -61,9 +61,9 @@ def __get_items_from_run(self, run): # get the timestamp of the run if possible run_date = self.__get_last_invocation_date(run) for result in run.get("results", []): - item = get_item(result, rules, artifacts, run_date) - if item is not None: - items.append(item) + result_items = get_items_from_result(result, rules, artifacts, run_date) + if result_items: + items.extend(result_items) return items def __get_last_invocation_date(self, data): @@ -189,34 +189,35 @@ def get_title(result, rule): return textwrap.shorten(title, 150) -def get_snippet(result): +def get_snippet(location): + snippet = None - if "locations" in result: - location = result["locations"][0] - if "physicalLocation" in location: - if "region" in location["physicalLocation"]: - if "snippet" in location["physicalLocation"]["region"]: - if ( - "text" - in location["physicalLocation"]["region"]["snippet"] - ): - snippet = location["physicalLocation"]["region"][ - "snippet" - ]["text"] - if ( - snippet is None - and "contextRegion" in location["physicalLocation"] - ): - if "snippet" in location["physicalLocation"]["contextRegion"]: - if ( - "text" - in location["physicalLocation"]["contextRegion"][ - "snippet" - ] - ): - snippet = location["physicalLocation"][ - "contextRegion" - ]["snippet"]["text"] + + if location and "physicalLocation" in location: + if "region" in location["physicalLocation"]: + if "snippet" in location["physicalLocation"]["region"]: + if ( + "text" + in location["physicalLocation"]["region"]["snippet"] + ): + snippet = location["physicalLocation"]["region"][ + "snippet" + ]["text"] + if ( + snippet is None + and "contextRegion" in location["physicalLocation"] + ): + if "snippet" in location["physicalLocation"]["contextRegion"]: + if ( + "text" + in location["physicalLocation"]["contextRegion"][ + "snippet" + ] + ): + snippet = location["physicalLocation"][ + "contextRegion" + ]["snippet"]["text"] + return snippet @@ -264,7 +265,7 @@ def get_codeFlowsDescription(codeFlows): return description -def get_description(result, rule): +def get_description(result, rule, location): description = "" message = "" if "message" in result: @@ -272,8 +273,8 @@ def get_description(result, rule): result["message"], rule, ) description += f"**Result message:** {message}\n" - if get_snippet(result) is not None: - description += f"**Snippet:**\n```\n{get_snippet(result)}\n```\n" + if get_snippet(location) is not None: + description += f"**Snippet:**\n```\n{get_snippet(location)}\n```\n" if rule is not None: if "name" in rule: description += f"**{_('Rule name')}:** {rule.get('name')}\n" @@ -351,7 +352,7 @@ def get_severity(result, rule): return "Medium" -def get_item(result, rules, artifacts, run_date): +def get_items_from_result(result, rules, artifacts, run_date): # see # https://docs.oasis-open.org/sarif/sarif/v2.1.0/csprd01/sarif-v2.1.0-csprd01.html # / 3.27.9 @@ -366,100 +367,117 @@ def get_item(result, rules, artifacts, run_date): if result.get("suppressions"): suppressed = True - # if there is a location get it - file_path = None - line = None + # if there is a location get all files into files list + + files = [] + if "locations" in result: - location = result["locations"][0] - if "physicalLocation" in location: - file_path = location["physicalLocation"]["artifactLocation"]["uri"] - - # 'region' attribute is optionnal - if "region" in location["physicalLocation"]: - # https://docs.oasis-open.org/sarif/sarif/v2.0/csprd02/sarif-v2.0-csprd02.html / 3.30.1 - # need to check whether it is byteOffset - if "byteOffset" in location["physicalLocation"]["region"]: - pass - else: - line = location["physicalLocation"]["region"]["startLine"] - - # test rule link - rule = rules.get(result.get("ruleId")) - - finding = Finding( - title=get_title(result, rule), - severity=get_severity(result, rule), - description=get_description(result, rule), - static_finding=True, # by definition - dynamic_finding=False, # by definition - false_p=suppressed, - active=not suppressed, - file_path=file_path, - line=line, - references=get_references(rule), - ) - - if "ruleId" in result: - finding.vuln_id_from_tool = result["ruleId"] - # for now we only support when the id of the rule is a CVE - if cve_try(result["ruleId"]): - finding.unsaved_vulnerability_ids = [cve_try(result["ruleId"])] - # some time the rule id is here but the tool doesn't define it - if rule is not None: - cwes_extracted = get_rule_cwes(rule) - if len(cwes_extracted) > 0: - finding.cwe = cwes_extracted[-1] - - # Some tools such as GitHub or Grype return the severity in properties - # instead - if "properties" in rule and "security-severity" in rule["properties"]: - try: - cvss = float(rule["properties"]["security-severity"]) - severity = cvss_to_severity(cvss) - finding.cvssv3_score = cvss - finding.severity = severity - except ValueError: - if rule["properties"]["security-severity"].lower().capitalize() in ["Info", "Low", "Medium", "High", "Critical"]: - finding.severity = rule["properties"]["security-severity"].lower().capitalize() - else: - finding.severity = "Info" - - # manage the case that some tools produce CWE as properties of the result - cwes_properties_extracted = get_result_cwes_properties(result) - if len(cwes_properties_extracted) > 0: - finding.cwe = cwes_properties_extracted[-1] - - # manage fixes provided in the report - if "fixes" in result: - finding.mitigation = "\n".join( - [fix.get("description", {}).get("text") for fix in result["fixes"]], - ) + for location in result["locations"]: + + file_path = None + line = None + + if "physicalLocation" in location: + file_path = location["physicalLocation"]["artifactLocation"]["uri"] + + # 'region' attribute is optionnal + if "region" in location["physicalLocation"]: + # https://docs.oasis-open.org/sarif/sarif/v2.0/csprd02/sarif-v2.0-csprd02.html / 3.30.1 + # need to check whether it is byteOffset + if "byteOffset" in location["physicalLocation"]["region"]: + pass + else: + line = location["physicalLocation"]["region"]["startLine"] - if run_date: - finding.date = run_date - - # manage tags provided in the report and rule and remove duplicated - tags = list(set(get_properties_tags(rule) + get_properties_tags(result))) - tags = [s.removeprefix("external/cwe/") for s in tags] - finding.tags = tags - - # manage fingerprints - # fingerprinting in SARIF is more complete than in current implementation - # SARIF standard make it possible to have multiple version in the same report - # for now we just take the first one and keep the format to be able to - # compare it - if result.get("fingerprints"): - hashes = get_fingerprints_hashes(result["fingerprints"]) - first_item = next(iter(hashes.items())) - finding.unique_id_from_tool = first_item[1]["value"] - elif result.get("partialFingerprints"): - # for this one we keep an order to have id that could be compared - hashes = get_fingerprints_hashes(result["partialFingerprints"]) - sorted_hashes = sorted(hashes.keys()) - finding.unique_id_from_tool = "|".join( - [f'{key}:{hashes[key]["value"]}' for key in sorted_hashes], + files.append((file_path, line, location)) + + if not files: + files.append((None, None, None)) + + result_items = [] + + for file_path, line, location in files: + + # test rule link + rule = rules.get(result.get("ruleId")) + + finding = Finding( + title=get_title(result, rule), + severity=get_severity(result, rule), + description=get_description(result, rule, location), + static_finding=True, # by definition + dynamic_finding=False, # by definition + false_p=suppressed, + active=not suppressed, + file_path=file_path, + line=line, + references=get_references(rule), ) - return finding + + if "ruleId" in result: + finding.vuln_id_from_tool = result["ruleId"] + # for now we only support when the id of the rule is a CVE + if cve_try(result["ruleId"]): + finding.unsaved_vulnerability_ids = [cve_try(result["ruleId"])] + # some time the rule id is here but the tool doesn't define it + if rule is not None: + cwes_extracted = get_rule_cwes(rule) + if len(cwes_extracted) > 0: + finding.cwe = cwes_extracted[-1] + + # Some tools such as GitHub or Grype return the severity in properties + # instead + if "properties" in rule and "security-severity" in rule["properties"]: + try: + cvss = float(rule["properties"]["security-severity"]) + severity = cvss_to_severity(cvss) + finding.cvssv3_score = cvss + finding.severity = severity + except ValueError: + if rule["properties"]["security-severity"].lower().capitalize() in ["Info", "Low", "Medium", "High", "Critical"]: + finding.severity = rule["properties"]["security-severity"].lower().capitalize() + else: + finding.severity = "Info" + + # manage the case that some tools produce CWE as properties of the result + cwes_properties_extracted = get_result_cwes_properties(result) + if len(cwes_properties_extracted) > 0: + finding.cwe = cwes_properties_extracted[-1] + + # manage fixes provided in the report + if "fixes" in result: + finding.mitigation = "\n".join( + [fix.get("description", {}).get("text") for fix in result["fixes"]], + ) + + if run_date: + finding.date = run_date + + # manage tags provided in the report and rule and remove duplicated + tags = list(set(get_properties_tags(rule) + get_properties_tags(result))) + tags = [s.removeprefix("external/cwe/") for s in tags] + finding.tags = tags + + # manage fingerprints + # fingerprinting in SARIF is more complete than in current implementation + # SARIF standard make it possible to have multiple version in the same report + # for now we just take the first one and keep the format to be able to + # compare it + if result.get("fingerprints"): + hashes = get_fingerprints_hashes(result["fingerprints"]) + first_item = next(iter(hashes.items())) + finding.unique_id_from_tool = first_item[1]["value"] + elif result.get("partialFingerprints"): + # for this one we keep an order to have id that could be compared + hashes = get_fingerprints_hashes(result["partialFingerprints"]) + sorted_hashes = sorted(hashes.keys()) + finding.unique_id_from_tool = "|".join( + [f'{key}:{hashes[key]["value"]}' for key in sorted_hashes], + ) + + result_items.append(finding) + + return result_items def get_fingerprints_hashes(values): diff --git a/unittests/tools/test_sarif_parser.py b/unittests/tools/test_sarif_parser.py index aafa91d09e..62fadbf3fc 100644 --- a/unittests/tools/test_sarif_parser.py +++ b/unittests/tools/test_sarif_parser.py @@ -238,7 +238,7 @@ def test_njsscan(self): with open(path.join(path.dirname(__file__), "../scans/sarif/njsscan.sarif"), encoding="utf-8") as testfile: parser = SarifParser() findings = parser.get_findings(testfile, Test()) - self.assertEqual(2, len(findings)) + self.assertEqual(3, len(findings)) # finding 0 finding = findings[0] self.assertEqual( @@ -313,7 +313,7 @@ def test_mobsfscan(self): with open(path.join(path.dirname(__file__), "../scans/sarif/mobsfscan.json"), encoding="utf-8") as testfile: parser = SarifParser() findings = parser.get_findings(testfile, Test()) - self.assertEqual(9, len(findings)) + self.assertEqual(18, len(findings)) for finding in findings: self.common_checks(finding)