Skip to content

Commit

Permalink
feat: add hyperscan support
Browse files Browse the repository at this point in the history
hyperscan will run simultaneously all version checkers on a file which
reduce processing time.

hyperscan depends on python >= 3.8 however python 3.7 will not have any
security support after 27 Jun 2023: https://endoflife.date/python

pyperscan package is used instead of the most well-known hyperscan
package as pyperscan allows to add a tag for each pattern. This feature
will allow to retrieve easily the checker associated to the matched
pattern.

Fix #2485

Signed-off-by: Fabrice Fontaine <[email protected]>
  • Loading branch information
ffontaine committed Feb 15, 2023
1 parent 93c110c commit 00a6ded
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 25 deletions.
37 changes: 23 additions & 14 deletions cve_bin_tool/checkers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,11 @@ def __new__(cls, name, bases, props):
f"Checker {name} has a VENDOR_PRODUCT string that is not lowercase"
)
# Compile regex
cls.CONTAINS_PATTERNS = list(map(re.compile, cls.CONTAINS_PATTERNS))
cls.VERSION_PATTERNS = list(map(re.compile, cls.VERSION_PATTERNS))
cls.FILENAME_PATTERNS = list(map(re.compile, cls.FILENAME_PATTERNS))
cls.CONTAINS_PATTERNS.extend(cls.VERSION_PATTERNS)
cls.REGEX_CONTAINS_PATTERNS = list(map(re.compile, cls.CONTAINS_PATTERNS))
cls.REGEX_VERSION_PATTERNS = list(map(re.compile, cls.VERSION_PATTERNS))
cls.REGEX_FILENAME_PATTERNS = list(map(re.compile, cls.FILENAME_PATTERNS))
cls.REGEX_CONTAINS_PATTERNS.extend(cls.REGEX_VERSION_PATTERNS)
cls.version_info = dict()
# Return the new checker class
return cls

Expand All @@ -332,23 +333,31 @@ class Checker(metaclass=CheckerMetaClass):
CONTAINS_PATTERNS: list[str] = []
VERSION_PATTERNS: list[str] = []
FILENAME_PATTERNS: list[str] = []
REGEX_CONTAINS_PATTERNS: list[str] = []
REGEX_VERSION_PATTERNS: list[str] = []
REGEX_FILENAME_PATTERNS: list[str] = []
VENDOR_PRODUCT: list[tuple[str, str]] = []

def guess_contains(self, lines):
if any(pattern.search(lines) for pattern in self.CONTAINS_PATTERNS):
if any(pattern.search(lines) for pattern in self.REGEX_CONTAINS_PATTERNS):
return True
return False

def get_version(self, lines, filename):
version_info = dict()
def get_version(self, lines, filename, version_lines=None):
if version_lines == None:
version_lines = lines

if any(pattern.match(filename) for pattern in self.FILENAME_PATTERNS):
version_info["is_or_contains"] = "is"
if any(pattern.match(filename) for pattern in self.REGEX_FILENAME_PATTERNS):
self.version_info["is_or_contains"] = "is"

if "is_or_contains" not in version_info and self.guess_contains(lines):
version_info["is_or_contains"] = "contains"
if "is_or_contains" not in self.version_info and self.guess_contains(lines):
self.version_info["is_or_contains"] = "contains"

if "is_or_contains" in version_info:
version_info["version"] = regex_find(lines, self.VERSION_PATTERNS)
if "is_or_contains" in self.version_info:
version = regex_find(version_lines, self.REGEX_VERSION_PATTERNS)

return version_info
# Don't override a "correct" version with UNKNOWN
if "version" not in self.version_info or version != "UNKNOWN":
self.version_info["version"] = version

return self.version_info
4 changes: 2 additions & 2 deletions cve_bin_tool/checkers/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ class PythonChecker(Checker):
VERSION_PATTERNS = [r"python([23]+\.[0-9])"]
VENDOR_PRODUCT = [("python_software_foundation", "python"), ("python", "python")]

def get_version(self, lines, filename):
def get_version(self, lines, filename, version_lines=None):
# we will try to find python3+ as well as python2+

# currently regex will probably find a single string "lib/python3.6"
# where 3.6 is the version similarly "lib/python2.7" where 2.7 is the version
version_info = super().get_version(lines, filename)
version_info = super().get_version(lines, filename, version_lines)

# we will check if the guess returned some version probably 3.6 or 2.7 in our example
# return version_info
Expand Down
4 changes: 2 additions & 2 deletions cve_bin_tool/checkers/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ def guess_contains(self, lines):
# If that fails, find a signature that might indicate presence of sqlite
return super().guess_contains(lines)

def get_version(self, lines, filename):
def get_version(self, lines, filename, version_lines=None):
"""returns version information for sqlite as found in a given file.
The most correct way to do this is to search for the sha1 sums per release.
Fedora rpms have a simpler SQLite version string.
"""

version_info = super().get_version(lines, filename)
version_info = super().get_version(lines, filename, version_lines)

for mapping in self.VERSION_MAP:
# Truncate last four characters as "If the source code has been edited
Expand Down
64 changes: 61 additions & 3 deletions cve_bin_tool/version_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from pathlib import Path, PurePath
from typing import Iterator

import attr
from pyperscan import Flag, Pattern, Scan, StreamDatabase

from cve_bin_tool.checkers import Checker
from cve_bin_tool.cvedb import CVEDB
from cve_bin_tool.egg_updater import IS_DEVELOP, update_egg
Expand All @@ -29,6 +32,14 @@
import importlib_resources as resources


@attr.define
class HyperscanMatchContext:
version_scanner: VersionScanner
filename: str
lines: str
task_result: dict


class InvalidFileError(Exception):
"""Filepath is invalid for scanning."""

Expand All @@ -47,6 +58,7 @@ def __init__(
error_mode: ErrorMode = ErrorMode.TruncTrace,
score: int = 0,
validate: bool = True,
hyperscan_db: StreamDatabase = None,
):
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
# Update egg if installed in development mode
Expand Down Expand Up @@ -74,6 +86,9 @@ def __init__(
# self.logger.info("Checkers loaded: %s" % (", ".join(self.checkers.keys())))
self.language_checkers = self.available_language_checkers()

# Build hyperscan_db with all checker's patterns
self.hyperscan_db = self.build_hyperscan_database(self.checkers)

@classmethod
def load_checkers(cls) -> dict[str, type[Checker]]:
"""Loads CVE checkers"""
Expand Down Expand Up @@ -104,6 +119,9 @@ def remove_skiplist(self, skips: list[str]) -> None:
else:
self.logger.error(f"Checker {skipme} is not a valid checker name")

# Rebuild hyperscan_db with checker's patterns
self.hyperscan_db = self.build_hyperscan_database(self.checkers)

def print_checkers(self) -> None:
self.logger.info(f'Checkers: {", ".join(self.checkers.keys())}')

Expand Down Expand Up @@ -204,11 +222,51 @@ def scan_file(self, filename: str) -> Iterator[ScanInfo]:

yield from self.run_checkers(filename, lines)

def run_checkers(self, filename: str, lines: str) -> Iterator[ScanInfo]:
# tko
def build_hyperscan_database(self, checkers: Checker) -> StreamDatabase:
patterns = []
for (dummy_checker_name, checker) in self.checkers.items():
checker = checker()
result = checker.get_version(lines, filename)
checker.dummy_checker_name = dummy_checker_name
for pattern in checker.VERSION_PATTERNS + checker.CONTAINS_PATTERNS:
patterns.append(
Pattern(pattern.encode(), Flag.SOM_LEFTMOST, tag=checker)
)

if patterns:
return StreamDatabase(*patterns)
else:
return None

@staticmethod
def hyperscan_match(
context: HyperscanMatchContext, checker: Checker, offset: int, end: int
) -> Scan:
# hyperscan doesn't support group capture so use standard regex
# (i.e. get_version)
result = checker.get_version(
context.lines, context.filename, context.lines[offset:end]
)

context.task_result[checker] = result

return Scan.Continue

def run_checkers(self, filename: str, lines: str) -> Iterator[ScanInfo]:
task_result = dict()
hyperscan_context = HyperscanMatchContext(
version_scanner=self,
filename=filename,
lines=lines,
task_result=task_result,
)

if self.hyperscan_db is not None:
scanner = self.hyperscan_db.build(hyperscan_context, self.hyperscan_match)
scanner.scan(lines.encode())

for checker in task_result:
result = task_result[checker]
dummy_checker_name = checker.dummy_checker_name
# do some magic so we can iterate over all results, even the ones that just return 1 hit
if "is_or_contains" in result:
results = [dict()]
Expand Down
3 changes: 2 additions & 1 deletion requirements.csv
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ python,urllib3
google,gsutil
skontar,cvss
python_not_in_db,packaging
python_not_in_db,importlib_resources
python_not_in_db,importlib_resources
vlaci_not_in_db,pyperscan
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ gsutil
cvss
packaging<22.0
importlib_resources; python_version < "3.9"
pyperscan
6 changes: 3 additions & 3 deletions test/test_checkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ class MyChecker(Checker):
FILENAME_PATTERNS = [r"myproduct"]
VENDOR_PRODUCT = [("myvendor", "myproduct")]

assert type(MyChecker.CONTAINS_PATTERNS[0]) == Pattern
assert type(MyChecker.VERSION_PATTERNS[0]) == Pattern
assert type(MyChecker.FILENAME_PATTERNS[0]) == Pattern
assert type(MyChecker.REGEX_CONTAINS_PATTERNS[0]) == Pattern
assert type(MyChecker.REGEX_VERSION_PATTERNS[0]) == Pattern
assert type(MyChecker.REGEX_FILENAME_PATTERNS[0]) == Pattern
assert type(MyChecker.VENDOR_PRODUCT[0]) == VendorProductPair

def test_no_vpkg(self):
Expand Down

0 comments on commit 00a6ded

Please sign in to comment.