Skip to content

Commit

Permalink
CveMatcher now has a valid SPDX Document stub that can either be repl…
Browse files Browse the repository at this point in the history
…aced or added to.

Changed the usage of this class to remove the processing from the object creation.  Now have to give the SPDX Document to the object before calling process.
Added method `add_package` to allow adding packages to the internal SPDX Document being used by CveMatcher.
  • Loading branch information
mdingwall committed Jan 7, 2025
1 parent 8ed9f39 commit f670aed
Showing 1 changed file with 116 additions and 30 deletions.
146 changes: 116 additions & 30 deletions ics_sbom_libs/cve_match/cvematcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,17 @@
import pathlib

from rich import table

from beartype.typing import Optional
from cpeparser import CpeParser
from spdx_tools.spdx.model import Document as SPDXDocument
from spdx_tools.spdx.model import ExternalPackageRefCategory
from spdx_tools.spdx.model import CreationInfo as SPDXCreationInfo
from spdx_tools.spdx.model import Package as SPDXPackage
from spdx_tools.spdx.model import Actor as SPDXActor
from spdx_tools.spdx.model import SpdxNoAssertion
from spdx_tools.spdx.model import ActorType as SPDXActorType
from spdx_tools.spdx.model import ExternalPackageRef as SPDXExternalPackageRef
from spdx_tools.spdx.model import ExternalPackageRefCategory as SPDXExternalRefCategory

from ics_sbom_libs.cve_match.package_matching.versionfactory import VersionFactory
from ics_sbom_libs.cve_match.cpe_match_results import CpeMatchResult
Expand All @@ -33,25 +41,103 @@ class MatchTableOutput(Enum):
All = 3


def cpe_factory(product: str, version: Optional[str] = None, vendor: Optional[str] = None):
_vendor = vendor if vendor else "*"
_version = version if version else "*"

return f"cpe:2.3:a:{_vendor}:{product}:{_version}:*:*:*:*:*:*:*"


class CveMatcher:
def __init__(self, spdx_document: SPDXDocument, db_path: pathlib.Path):
self.resultList: list[MatchResult] = []
self.spdx_document = spdx_document
db_path: pathlib.Path
_spdx_document: SPDXDocument

# Results
result_list: list[MatchResult]

# Results meta data
total_package_count: int
dirty_package_count: int
clean_package_count: int
total_cve_count: int

# Timing meta data
scanTime: str

def __init__(self, db_path: pathlib.Path):
self.db_path = db_path
self.totalPackages = len(self.spdx_document.packages)
self.dirtyPackages = 0
self.cleanPackages = 0
self.totalCves = 0

self.resultList = process(self.spdx_document, self.db_path)
self.scanTime = str(datetime.datetime.utcnow()).replace(" ", "T")[:-7] + "Z"
self.total_package_count = 0
self.dirty_package_count = 0
self.clean_package_count = 0
self.total_cve_count = 0

creation_info = SPDXCreationInfo(
spdx_version="SPDX-2.3",
spdx_id="SPDXRef-DOCUMENT",
name="ICS SBOM Package Search",
document_namespace="",
creators=[SPDXActor(SPDXActorType.TOOL, "icsbom")],
created=datetime.datetime.now(datetime.timezone.utc),
)
self._spdx_document = SPDXDocument(creation_info=creation_info)

@property
def spdx_document(self):
return self._spdx_document

@spdx_document.setter
def spdx_document(self, document):
if not isinstance(document, SPDXDocument):
print("[yellow][b]WARNING:[/b][/yellow] The provided document is not an SPDXDocument.")
return

self._spdx_document = document

def add_package(self, package: str, version: Optional[str] = None, vendor: Optional[str] = None):
if len(package) == 0:
print("[red][b]ERROR:[/b] The package name is empty. Not adding to Matcher.[/red]")
return

new_package = SPDXPackage(
name=package,
spdx_id=f"SPDXRef-{package}",
version=version,
supplier=SpdxNoAssertion(),
download_location=SpdxNoAssertion(),
description="No Text",
)

if vendor:
new_package.external_references = [
SPDXExternalPackageRef(
SPDXExternalRefCategory.SECURITY,
"http://spdx.org/rdf/references/cpe23Type",
cpe_factory(package, version, vendor),
)
]

self._spdx_document.packages.append(new_package)

def process(self):
if len(self.spdx_document.packages) == 0:
print("[red][b]ERROR:[/b] No SPDX Document has no packages[/red]")
raise RuntimeError("No SPDX Document has no packages")

self.total_package_count = len(self.spdx_document.packages)
self.dirty_package_count = 0
self.clean_package_count = 0
self.total_cve_count = 0

self.result_list = process(self.spdx_document, self.db_path)
self.scanTime = str(datetime.datetime.now(datetime.timezone.utc)).replace(" ", "T")[:-7] + "Z"

for result in self.resultList:
for result in self.result_list:
if result.cve_list:
self.dirtyPackages += 1
self.totalCves += len(result.cve_list)
self.dirty_package_count += 1
self.total_cve_count += len(result.cve_list)
else:
self.cleanPackages += 1
self.clean_package_count += 1

def create_match_table(self, table_output: MatchTableOutput = MatchTableOutput.All):
match_table = table.Table(title="CVE Results", row_styles=["dim", ""], expand=True)
Expand All @@ -66,7 +152,7 @@ def create_match_table(self, table_output: MatchTableOutput = MatchTableOutput.A
match_table.add_column(header="CVEs")

vuln_info_counts = {"Total": 0, "NONE": 0, "LOW": 0, "MEDIUM": 0, "HIGH": 0, "CRITICAL": 0}
for result in self.resultList:
for result in self.result_list:
vuln_info = result.get_severity_info()
if result.cve_list and table_output != MatchTableOutput.WithoutCvesOnly:
sorted_cves = sorted(result.cve_list)
Expand Down Expand Up @@ -119,7 +205,7 @@ def create_match_table(self, table_output: MatchTableOutput = MatchTableOutput.A

def get_severity_info(self):
vuln_info_counts = {"Total": 0, "NONE": 0, "LOW": 0, "MEDIUM": 0, "HIGH": 0, "CRITICAL": 0}
for result in self.resultList:
for result in self.result_list:
vuln_info = result.get_severity_info()
vuln_info_counts["Total"] += len(result.cve_list)
for severity in list(vuln_info_counts.keys())[1:]:
Expand All @@ -143,7 +229,7 @@ def process(spdx_document: SPDXDocument, db_path: pathlib.Path):
if use_parallel:
with ProcessPoolExecutor() as executor:
future_to_package = {
executor.submit(process_package, package, db_path): package for package in spdx_document.packages
executor.submit(process_spdx_package, package, db_path): package for package in spdx_document.packages
}
for future in as_completed(future_to_package):
package = future_to_package[future]
Expand All @@ -157,7 +243,7 @@ def process(spdx_document: SPDXDocument, db_path: pathlib.Path):
package_pbar.update()
else:
for package in spdx_document.packages:
result, unique_cpes_partial = process_package(package, db_path)
result, unique_cpes_partial = process_spdx_package(package, db_path)
match_results.update(result)
for cpe, packages in unique_cpes_partial.items():
if cpe not in unique_cpes:
Expand Down Expand Up @@ -326,32 +412,32 @@ def cve_version_included(db: VulnerabilityDatabase, cve_id, package_name, packag
def generate_cpe_list(ext_refs) -> list[str]:
cpe_list: list[str] = []
for ref in ext_refs:
if ref.category is ExternalPackageRefCategory.SECURITY and ref.reference_type.find("/cpe") != -1:
if ref.category is SPDXExternalRefCategory.SECURITY and ref.reference_type.find("/cpe") != -1:
cpe = ref.locator
cpe_list.append(cpe)
return cpe_list


def process_package(package, db_path):
match = MatchResult(name=package.name, version=package.version)
def process_spdx_package(spdx_package, db_path):
match = MatchResult(name=spdx_package.name, version=spdx_package.version)
unique_cpes = {}
match_results = {}

cpes = generate_cpe_list(package.external_references)
cpes = generate_cpe_list(spdx_package.external_references)
if cpes:
for cpe in cpes:
if cpe not in unique_cpes.keys():
unique_cpes[cpe] = [package]
unique_cpes[cpe] = [spdx_package]
else:
unique_cpes[cpe].append(package)
unique_cpes[cpe].append(spdx_package)
match.cpe_list.append(cpe)
else:
looked_up_cpes = lookup_cpe_for_package(package.name, db_path)
new_cpe = CpeParser().parser(f"cpe:2.3:a:*:{package.name}:{package.version}:*:*:*:*:*:*:*")
looked_up_cpes = lookup_cpe_for_package(spdx_package.name, db_path)
new_cpe = CpeParser().parser(cpe_factory(spdx_package.name, spdx_package.version))
if not looked_up_cpes:
new_cpe_str = create_cpe_string(new_cpe)
if new_cpe_str not in unique_cpes:
unique_cpes[new_cpe_str] = [package]
unique_cpes[new_cpe_str] = [spdx_package]
match.cpe_list.append(new_cpe_str)
else:
for cpe in looked_up_cpes:
Expand All @@ -360,11 +446,11 @@ def process_package(package, db_path):
new_cpe["vendor"] = parsed_cpe["vendor"]
new_cpe_str = create_cpe_string(new_cpe)
if new_cpe_str not in unique_cpes.keys():
unique_cpes[new_cpe_str] = [package]
unique_cpes[new_cpe_str] = [spdx_package]
else:
unique_cpes[new_cpe_str].append(package)
unique_cpes[new_cpe_str].append(spdx_package)
if new_cpe_str not in match.cpe_list:
match.cpe_list.append(new_cpe_str)

match_results[package.name] = match
match_results[spdx_package.name] = match
return match_results, unique_cpes

0 comments on commit f670aed

Please sign in to comment.