From d61a73fe062d1465e984095505d90cf23e0984f7 Mon Sep 17 00:00:00 2001 From: "John M. Horan" Date: Mon, 21 Nov 2022 12:54:16 -0800 Subject: [PATCH] Update testing and importer #971 Reference: https://github.com/nexB/vulnerablecode/issues/971 Signed-off-by: John M. Horan --- vulnerabilities/importers/__init__.py | 2 - vulnerabilities/importers/apache_httpd.py | 161 +++++++++----- vulnerabilities/tests/test_apache_httpd.py | 234 +++++++++++++-------- 3 files changed, 254 insertions(+), 143 deletions(-) diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 4bc45b42f..0c0cd7f8d 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -8,7 +8,6 @@ # from vulnerabilities.importers import alpine_linux -from vulnerabilities.importers import apache_httpd from vulnerabilities.importers import archlinux from vulnerabilities.importers import debian from vulnerabilities.importers import debian_oval @@ -38,7 +37,6 @@ archlinux.ArchlinuxImporter, ubuntu.UbuntuImporter, debian_oval.DebianOvalImporter, - apache_httpd.ApacheHTTPDImporter, ] IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY} diff --git a/vulnerabilities/importers/apache_httpd.py b/vulnerabilities/importers/apache_httpd.py index 95ec8b555..096c631f8 100644 --- a/vulnerabilities/importers/apache_httpd.py +++ b/vulnerabilities/importers/apache_httpd.py @@ -17,12 +17,14 @@ from univers.versions import SemverVersion from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackage from vulnerabilities.importer import Importer from vulnerabilities.importer import Reference from vulnerabilities.importer import VulnerabilitySeverity from vulnerabilities.package_managers import GitHubTagsAPI from vulnerabilities.severity_systems import APACHE_HTTPD -from vulnerabilities.utils import nearest_patched_package + +# from vulnerabilities.utils import nearest_patched_package class ApacheHTTPDImporter(Importer): @@ -30,21 +32,20 @@ class ApacheHTTPDImporter(Importer): base_url = "https://httpd.apache.org/security/json/" # For now, don't use the GH API - - def set_api(self): - self.version_api = GitHubTagsAPI() - asyncio.run(self.version_api.load_api(["apache/httpd"])) - self.version_api.cache["apache/httpd"] = set( - filter( - lambda version: version.value not in ignore_tags, - self.version_api.cache["apache/httpd"], - ) - ) + # def set_api(self): + # self.version_api = GitHubTagsAPI() + # asyncio.run(self.version_api.load_api(["apache/httpd"])) + # self.version_api.cache["apache/httpd"] = set( + # filter( + # lambda version: version.value not in ignore_tags, + # self.version_api.cache["apache/httpd"], + # ) + # ) def updated_advisories(self): links = fetch_links(self.base_url) # For now, don't use the GH API - self.set_api() + # self.set_api() advisories = [] for link in links: data = requests.get(link).json() @@ -52,7 +53,8 @@ def updated_advisories(self): return self.batch_advisories(advisories) def to_advisory(self, data): - cve = data["CVE_data_meta"]["ID"] + # cve = data["CVE_data_meta"]["ID"] + alias = data["CVE_data_meta"]["ID"] descriptions = data["description"]["description_data"] description = None for desc in descriptions: @@ -73,67 +75,118 @@ def to_advisory(self, data): ) break reference = Reference( - reference_id=cve, - url=urllib.parse.urljoin(self.base_url, f"{cve}.json"), + # reference_id=cve, + reference_id=alias, + # url=urllib.parse.urljoin(self.base_url, f"{cve}.json"), + url=urllib.parse.urljoin(self.base_url, f"{alias}.json"), severities=severities, ) + # 2022-11-17 Thursday 19:02:16. This redraft of mine looks wrong and unnecessary -- current approach looks like what we want, since sampling suggests there are no real references in the JSON data and that there's always one value in ["impact"]["other"] + # reference_list = [] + # # reference_data = data["references"] + # # if data["references"]["reference_data"]: + # if "reference_data" in data.get("references", {}): + # reference = Reference( + # reference_id=data["references"]["reference_data"][0]["refsource"], + # url=data["references"]["reference_data"][0]["refsource"], + # severities=severities, + # ) + # else: + # reference = Reference( + # reference_id="", + # url="", + # severities=severities, + # ) + versions_data = [] for vendor in data["affects"]["vendor"]["vendor_data"]: for products in vendor["product"]["product_data"]: for version_data in products["version"]["version_data"]: versions_data.append(version_data) - fixed_version_ranges, affected_version_ranges = self.to_version_ranges(versions_data) + print("\n\n==> versions_data = {}\n".format(versions_data)) + for version in versions_data: + print("\n\tversion = {}\n".format(version)) + import json + + # print(json.dumps(version, indent=2)) + print("\n\tversion = \n{}\n".format(json.dumps(version, indent=2))) + + # fixed_version_ranges, affected_version_ranges = self.to_version_ranges(versions_data) + + fixed_version = [] + + for entry in data["timeline"]: + value = entry["value"] + # if "released" in entry["value"]: + if "released" in value: + # fixed_version.append(entry["value"]) + fixed_version.append(value.split(" ")[0]) affected_packages = [] - fixed_packages = [] - - for version_range in fixed_version_ranges: - fixed_packages.extend( - [ - PackageURL(type="apache", name="httpd", version=version) - for version in self.version_api.get("apache/httpd").valid_versions - if SemverVersion(version) in version_range - ] - ) + # fixed_packages = [] - for version_range in affected_version_ranges: - affected_packages.extend( - [ - PackageURL(type="apache", name="httpd", version=version) - for version in self.version_api.get("apache/httpd").valid_versions - if SemverVersion(version) in version_range - ] + for version in versions_data: + affected_package = AffectedPackage( + package=PackageURL( + type="generic", + name="apache_httpd", + ), + # affected_version_range=affected_version_range, + affected_version_range=version.get("version_value", "ERROR!!"), + fixed_version=fixed_version[0], + # fixed_version="to come", ) + affected_packages.append(affected_package) + + # for version_range in fixed_version_ranges: + # fixed_packages.extend( + # [ + # PackageURL(type="apache", name="httpd", version=version) + # for version in self.version_api.get("apache/httpd").valid_versions + # if SemverVersion(version) in version_range + # ] + # ) + + # for version_range in affected_version_ranges: + # affected_packages.extend( + # [ + # PackageURL(type="apache", name="httpd", version=version) + # for version in self.version_api.get("apache/httpd").valid_versions + # if SemverVersion(version) in version_range + # ] + # ) return AdvisoryData( - vulnerability_id=cve, + # vulnerability_id=cve, + aliases=[alias], summary=description, - affected_packages=nearest_patched_package(affected_packages, fixed_packages), + # affected_packages=nearest_patched_package(affected_packages, fixed_packages), + affected_packages=affected_packages, references=[reference], ) - def to_version_ranges(self, versions_data): - fixed_version_ranges = [] - affected_version_ranges = [] - for version_data in versions_data: - version_value = version_data["version_value"] - range_expression = version_data["version_affected"] - if range_expression == "<": - fixed_version_ranges.append( - VersionRange.from_scheme_version_spec_string( - "semver", ">={}".format(version_value) - ) - ) - elif range_expression == "=" or range_expression == "?=": - affected_version_ranges.append( - VersionRange.from_scheme_version_spec_string( - "semver", "{}".format(version_value) - ) - ) + # def to_version_ranges(self, versions_data): + # fixed_version_ranges = [] + # affected_version_ranges = [] + # for version_data in versions_data: + # version_value = version_data["version_value"] + # range_expression = version_data["version_affected"] + # if range_expression == "<": + # fixed_version_ranges.append( + # VersionRange.from_scheme_version_spec_string( + # "semver", ">={}".format(version_value) + # ) + # ) + # elif range_expression == "=" or range_expression == "?=": + # affected_version_ranges.append( + # VersionRange.from_scheme_version_spec_string( + # "semver", "{}".format(version_value) + # ) + # ) - return (fixed_version_ranges, affected_version_ranges) + # return (fixed_version_ranges, affected_version_ranges) def fetch_links(url): diff --git a/vulnerabilities/tests/test_apache_httpd.py b/vulnerabilities/tests/test_apache_httpd.py index 1d0e1ab18..97c4b7d65 100644 --- a/vulnerabilities/tests/test_apache_httpd.py +++ b/vulnerabilities/tests/test_apache_httpd.py @@ -19,98 +19,158 @@ from vulnerabilities.importer import Reference from vulnerabilities.importer import VulnerabilitySeverity from vulnerabilities.importers.apache_httpd import ApacheHTTPDImporter + +# from vulnerabilities.importers.apache_httpd import to_advisory from vulnerabilities.package_managers import GitHubTagsAPI from vulnerabilities.package_managers import PackageVersion +from vulnerabilities.tests import util_tests from vulnerabilities.utils import AffectedPackage BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA = os.path.join(BASE_DIR, "test_data", "apache_httpd", "CVE-1999-1199.json") +# TEST_DATA = os.path.join(BASE_DIR, "test_data", "apache_httpd", "CVE-1999-1199.json") +TEST_DATA = os.path.join(BASE_DIR, "test_data/apache_httpd") + + +# class TestApacheHTTPDImporter(TestCase): +# @classmethod +# def setUpClass(cls): +# data_source_cfg = {"etags": {}} +# cls.data_src = ApacheHTTPDImporter(1, config=data_source_cfg) +# known_versions = [PackageVersion("1.3.2"), PackageVersion("1.3.1"), PackageVersion("1.3.0")] +# cls.data_src.version_api = GitHubTagsAPI(cache={"apache/httpd": known_versions}) +# with open(TEST_DATA) as f: +# cls.data = json.load(f) + +# def test_to_version_ranges(self): +# data = [ +# { +# "version_affected": "?=", +# "version_value": "1.3.0", +# }, +# { +# "version_affected": "=", +# "version_value": "1.3.1", +# }, +# { +# "version_affected": "<", +# "version_value": "1.3.2", +# }, +# ] +# fixed_version_ranges, affected_version_ranges = self.data_src.to_version_ranges(data) + +# # Check fixed packages +# assert [ +# VersionRange.from_scheme_version_spec_string("semver", ">=1.3.2") +# ] == fixed_version_ranges + +# # Check vulnerable packages +# assert [ +# VersionRange.from_scheme_version_spec_string("semver", "==1.3.0"), +# VersionRange.from_scheme_version_spec_string("semver", "==1.3.1"), +# ] == affected_version_ranges + +# def test_to_advisory(self): +# expected_advisories = [ +# AdvisoryData( +# summary="A serious problem exists when a client sends a large number of " +# "headers with the same header name. Apache uses up memory faster than the " +# "amount of memory required to simply store the received data itself. That " +# "is, memory use increases faster and faster as more headers are received, " +# "rather than increasing at a constant rate. This makes a denial of service " +# "attack based on this method more effective than methods which cause Apache" +# " to use memory at a constant rate, since the attacker has to send less data.", +# affected_packages=[ +# AffectedPackage( +# vulnerable_package=PackageURL( +# type="apache", +# name="httpd", +# version="1.3.0", +# ), +# ), +# AffectedPackage( +# vulnerable_package=PackageURL( +# type="apache", +# name="httpd", +# version="1.3.1", +# ), +# ), +# ], +# references=[ +# Reference( +# url="https://httpd.apache.org/security/json/CVE-1999-1199.json", +# severities=[ +# VulnerabilitySeverity( +# system=severity_systems.APACHE_HTTPD, +# value="important", +# ), +# ], +# reference_id="CVE-1999-1199", +# ), +# ], +# vulnerability_id="CVE-1999-1199", +# ) +# ] +# found_advisories = [self.data_src.to_advisory(self.data)] +# found_advisories = list(map(AdvisoryData.normalized, found_advisories)) +# expected_advisories = list(map(AdvisoryData.normalized, expected_advisories)) +# assert sorted(found_advisories) == sorted(expected_advisories) + +# def test_misc_01(self): +# print("\nHello!\n") +# assert True == True + + +# def test_misc_01(): +# print("\nHello!\n") +# assert True == True + + +# def test_to_advisory(): +# with open(os.path.join(TEST_DATA, "CVE-1999-1199.json")) as f: +# raw_data = json.load(f) + +# print("\n\nraw_data = \n{}\n".format(raw_data)) + +# # print("\npretty raw_data = {}".format(json.dumps(raw_data, indent=4))) + +# # The following throws an error: TypeError: to_advisory() missing 1 required positional argument: 'data' +# # presumably because it also needs to pass 'self'? +# advisories = ApacheHTTPDImporter.to_advisory(raw_data) +# # result = [data.to_dict() for data in advisories] +# # expected_file = os.path.join(TEST_DATA, f"parse-advisory-postgresql-expected.json") +# # util_tests.check_results_against_json(result, expected_file) class TestApacheHTTPDImporter(TestCase): - @classmethod - def setUpClass(cls): - data_source_cfg = {"etags": {}} - cls.data_src = ApacheHTTPDImporter(1, config=data_source_cfg) - known_versions = [PackageVersion("1.3.2"), PackageVersion("1.3.1"), PackageVersion("1.3.0")] - cls.data_src.version_api = GitHubTagsAPI(cache={"apache/httpd": known_versions}) - with open(TEST_DATA) as f: - cls.data = json.load(f) - - def test_to_version_ranges(self): - data = [ - { - "version_affected": "?=", - "version_value": "1.3.0", - }, - { - "version_affected": "=", - "version_value": "1.3.1", - }, - { - "version_affected": "<", - "version_value": "1.3.2", - }, - ] - fixed_version_ranges, affected_version_ranges = self.data_src.to_version_ranges(data) - - # Check fixed packages - assert [ - VersionRange.from_scheme_version_spec_string("semver", ">=1.3.2") - ] == fixed_version_ranges - - # Check vulnerable packages - assert [ - VersionRange.from_scheme_version_spec_string("semver", "==1.3.0"), - VersionRange.from_scheme_version_spec_string("semver", "==1.3.1"), - ] == affected_version_ranges - - def test_to_advisory(self): - expected_advisories = [ - AdvisoryData( - summary="A serious problem exists when a client sends a large number of " - "headers with the same header name. Apache uses up memory faster than the " - "amount of memory required to simply store the received data itself. That " - "is, memory use increases faster and faster as more headers are received, " - "rather than increasing at a constant rate. This makes a denial of service " - "attack based on this method more effective than methods which cause Apache" - " to use memory at a constant rate, since the attacker has to send less data.", - affected_packages=[ - AffectedPackage( - vulnerable_package=PackageURL( - type="apache", - name="httpd", - version="1.3.0", - ), - ), - AffectedPackage( - vulnerable_package=PackageURL( - type="apache", - name="httpd", - version="1.3.1", - ), - ), - ], - references=[ - Reference( - url="https://httpd.apache.org/security/json/CVE-1999-1199.json", - severities=[ - VulnerabilitySeverity( - system=severity_systems.APACHE_HTTPD, - value="important", - ), - ], - reference_id="CVE-1999-1199", - ), - ], - vulnerability_id="CVE-1999-1199", - ) - ] - found_advisories = [self.data_src.to_advisory(self.data)] - found_advisories = list(map(AdvisoryData.normalized, found_advisories)) - expected_advisories = list(map(AdvisoryData.normalized, expected_advisories)) - assert sorted(found_advisories) == sorted(expected_advisories) - - def test_misc_01(self): - print("\nHello!\n") - assert True == True + base_url = "https://httpd.apache.org/security/json/" + + def test_to_advisory_in_class(self): + # print("\nHello!\n") + with open(os.path.join(TEST_DATA, "CVE-1999-1199.json")) as f: + raw_data = json.load(f) + + # print("\n\nraw_data = \n{}\n".format(raw_data)) + # print("\npretty raw_data = {}".format(json.dumps(raw_data, indent=2))) + + advisory = ApacheHTTPDImporter.to_advisory(self, raw_data) + + print("\n\nadvisory = \n{}\n".format(advisory)) + + print("advisory.aliases = {}\n".format(advisory.aliases)) + + print("advisory.summary = {}\n".format(advisory.summary)) + + print("advisory.affected_packages = {}\n".format(advisory.affected_packages)) + + print("advisory.references = {}\n".format(advisory.references)) + for ref in advisory.references: + print("\treference = {}\n".format(ref)) + + print("advisory.date_published = {}\n".format(advisory.date_published)) + + # result = [data.to_dict() for data in advisories] + result = advisory.to_dict() + + print("result = {}\n".format(result)) + + print("\npretty result = \n{}".format(json.dumps(result, indent=2)))