Skip to content

Commit

Permalink
Update testing and importer #971
Browse files Browse the repository at this point in the history
Reference: #971

Signed-off-by: John M. Horan <[email protected]>
  • Loading branch information
johnmhoran committed Nov 29, 2022
1 parent 0a97250 commit 7176511
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 143 deletions.
2 changes: 0 additions & 2 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#

from vulnerabilities.importers import alpine_linux
from vulnerabilities.importers import apache_httpd
from vulnerabilities.importers import archlinux
from vulnerabilities.importers import debian
from vulnerabilities.importers import debian_oval
Expand Down Expand Up @@ -38,7 +37,6 @@
archlinux.ArchlinuxImporter,
ubuntu.UbuntuImporter,
debian_oval.DebianOvalImporter,
apache_httpd.ApacheHTTPDImporter,
]

IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}
161 changes: 107 additions & 54 deletions vulnerabilities/importers/apache_httpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,44 @@
from univers.versions import SemverVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.package_managers import GitHubTagsAPI
from vulnerabilities.severity_systems import APACHE_HTTPD
from vulnerabilities.utils import nearest_patched_package

# from vulnerabilities.utils import nearest_patched_package


class ApacheHTTPDImporter(Importer):

base_url = "https://httpd.apache.org/security/json/"

# For now, don't use the GH API

def set_api(self):
self.version_api = GitHubTagsAPI()
asyncio.run(self.version_api.load_api(["apache/httpd"]))
self.version_api.cache["apache/httpd"] = set(
filter(
lambda version: version.value not in ignore_tags,
self.version_api.cache["apache/httpd"],
)
)
# def set_api(self):
# self.version_api = GitHubTagsAPI()
# asyncio.run(self.version_api.load_api(["apache/httpd"]))
# self.version_api.cache["apache/httpd"] = set(
# filter(
# lambda version: version.value not in ignore_tags,
# self.version_api.cache["apache/httpd"],
# )
# )

def updated_advisories(self):
links = fetch_links(self.base_url)
# For now, don't use the GH API
self.set_api()
# self.set_api()
advisories = []
for link in links:
data = requests.get(link).json()
advisories.append(self.to_advisory(data))
return self.batch_advisories(advisories)

def to_advisory(self, data):
cve = data["CVE_data_meta"]["ID"]
# cve = data["CVE_data_meta"]["ID"]
alias = data["CVE_data_meta"]["ID"]
descriptions = data["description"]["description_data"]
description = None
for desc in descriptions:
Expand All @@ -73,67 +75,118 @@ def to_advisory(self, data):
)
break
reference = Reference(
reference_id=cve,
url=urllib.parse.urljoin(self.base_url, f"{cve}.json"),
# reference_id=cve,
reference_id=alias,
# url=urllib.parse.urljoin(self.base_url, f"{cve}.json"),
url=urllib.parse.urljoin(self.base_url, f"{alias}.json"),
severities=severities,
)

# 2022-11-17 Thursday 19:02:16. This redraft of mine looks wrong and unnecessary -- current approach looks like what we want, since sampling suggests there are no real references in the JSON data and that there's always one value in ["impact"]["other"]
# reference_list = []
# # reference_data = data["references"]
# # if data["references"]["reference_data"]:
# if "reference_data" in data.get("references", {}):
# reference = Reference(
# reference_id=data["references"]["reference_data"][0]["refsource"],
# url=data["references"]["reference_data"][0]["refsource"],
# severities=severities,
# )
# else:
# reference = Reference(
# reference_id="",
# url="",
# severities=severities,
# )

versions_data = []
for vendor in data["affects"]["vendor"]["vendor_data"]:
for products in vendor["product"]["product_data"]:
for version_data in products["version"]["version_data"]:
versions_data.append(version_data)

fixed_version_ranges, affected_version_ranges = self.to_version_ranges(versions_data)
print("\n\n==> versions_data = {}\n".format(versions_data))
for version in versions_data:
print("\n\tversion = {}\n".format(version))
import json

# print(json.dumps(version, indent=2))
print("\n\tversion = \n{}\n".format(json.dumps(version, indent=2)))

# fixed_version_ranges, affected_version_ranges = self.to_version_ranges(versions_data)

fixed_version = []

for entry in data["timeline"]:
value = entry["value"]
# if "released" in entry["value"]:
if "released" in value:
# fixed_version.append(entry["value"])
fixed_version.append(value.split(" ")[0])

affected_packages = []
fixed_packages = []

for version_range in fixed_version_ranges:
fixed_packages.extend(
[
PackageURL(type="apache", name="httpd", version=version)
for version in self.version_api.get("apache/httpd").valid_versions
if SemverVersion(version) in version_range
]
)
# fixed_packages = []

for version_range in affected_version_ranges:
affected_packages.extend(
[
PackageURL(type="apache", name="httpd", version=version)
for version in self.version_api.get("apache/httpd").valid_versions
if SemverVersion(version) in version_range
]
for version in versions_data:
affected_package = AffectedPackage(
package=PackageURL(
type="generic",
name="apache_httpd",
),
# affected_version_range=affected_version_range,
affected_version_range=version.get("version_value", "ERROR!!"),
fixed_version=fixed_version[0],
# fixed_version="to come",
)
affected_packages.append(affected_package)

# for version_range in fixed_version_ranges:
# fixed_packages.extend(
# [
# PackageURL(type="apache", name="httpd", version=version)
# for version in self.version_api.get("apache/httpd").valid_versions
# if SemverVersion(version) in version_range
# ]
# )

# for version_range in affected_version_ranges:
# affected_packages.extend(
# [
# PackageURL(type="apache", name="httpd", version=version)
# for version in self.version_api.get("apache/httpd").valid_versions
# if SemverVersion(version) in version_range
# ]
# )

return AdvisoryData(
vulnerability_id=cve,
# vulnerability_id=cve,
aliases=[alias],
summary=description,
affected_packages=nearest_patched_package(affected_packages, fixed_packages),
# affected_packages=nearest_patched_package(affected_packages, fixed_packages),
affected_packages=affected_packages,
references=[reference],
)

def to_version_ranges(self, versions_data):
fixed_version_ranges = []
affected_version_ranges = []
for version_data in versions_data:
version_value = version_data["version_value"]
range_expression = version_data["version_affected"]
if range_expression == "<":
fixed_version_ranges.append(
VersionRange.from_scheme_version_spec_string(
"semver", ">={}".format(version_value)
)
)
elif range_expression == "=" or range_expression == "?=":
affected_version_ranges.append(
VersionRange.from_scheme_version_spec_string(
"semver", "{}".format(version_value)
)
)
# def to_version_ranges(self, versions_data):
# fixed_version_ranges = []
# affected_version_ranges = []
# for version_data in versions_data:
# version_value = version_data["version_value"]
# range_expression = version_data["version_affected"]
# if range_expression == "<":
# fixed_version_ranges.append(
# VersionRange.from_scheme_version_spec_string(
# "semver", ">={}".format(version_value)
# )
# )
# elif range_expression == "=" or range_expression == "?=":
# affected_version_ranges.append(
# VersionRange.from_scheme_version_spec_string(
# "semver", "{}".format(version_value)
# )
# )

return (fixed_version_ranges, affected_version_ranges)
# return (fixed_version_ranges, affected_version_ranges)


def fetch_links(url):
Expand Down
Loading

0 comments on commit 7176511

Please sign in to comment.