Skip to content

Commit

Permalink
Merge pull request aboutcode-org#1009 from nexB/vulntotal
Browse files Browse the repository at this point in the history
Add Vulntotal
  • Loading branch information
pombredanne authored Nov 21, 2022
2 parents e38eb1b + 6d838e6 commit bf49673
Show file tree
Hide file tree
Showing 56 changed files with 7,600 additions and 1 deletion.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,4 @@ dev =
[options.entry_points]
console_scripts =
vulnerablecode = vulnerablecode:command_line

vulntotal = vulntotal.vulntotal_cli:handler
11 changes: 11 additions & 0 deletions vulnerabilities/tests/util_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ def check_results_against_json(
with open(expected_file) as exp:
expected = json.load(exp)

check_results_against_expected(results, expected)


def check_results_against_expected(
results,
expected,
):
"""
Check the JSON-serializable mapping or sequence ``results`` against the
``expected``.
"""
# NOTE we redump the JSON as a YAML string for easier display of
# the failures comparison/diff
if results != expected:
Expand Down
8 changes: 8 additions & 0 deletions vulntotal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
27 changes: 27 additions & 0 deletions vulntotal/datasources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from vulntotal.datasources import deps
from vulntotal.datasources import github
from vulntotal.datasources import gitlab
from vulntotal.datasources import oss_index
from vulntotal.datasources import osv
from vulntotal.datasources import snyk
from vulntotal.datasources import vulnerablecode
from vulntotal.validator import DataSource

DATASOURCE_REGISTRY = {
"deps": deps.DepsDataSource,
"github": github.GithubDataSource,
"gitlab": gitlab.GitlabDataSource,
"oss_index": oss_index.OSSDataSource,
"osv": osv.OSVDataSource,
"snyk": snyk.SnykDataSource,
"vulnerablecode": vulnerablecode.VulnerableCodeDataSource,
}
107 changes: 107 additions & 0 deletions vulntotal/datasources/deps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import logging
from typing import Iterable
from urllib.parse import quote

import requests

from vulntotal.validator import DataSource
from vulntotal.validator import VendorData

logger = logging.getLogger(__name__)


class DepsDataSource(DataSource):
spdx_license_expression = "TODO"
license_url = "TODO"

def fetch_json_response(self, url):
response = requests.get(url)
if not response.status_code == 200 or response.text == "Not Found":
logger.error(f"Error while fetching {url}")
return
return response.json()

def datasource_advisory(self, purl) -> Iterable[VendorData]:
payload = generate_meta_payload(purl)
response = self.fetch_json_response(payload)
if response:
advisories = parse_advisories_from_meta(response)
if advisories:
for advisory in advisories:
advisory_payload = generate_advisory_payload(advisory)
fetched_advisory = self.fetch_json_response(advisory_payload)
self._raw_dump.append(fetched_advisory)
if fetched_advisory:
return parse_advisory(fetched_advisory)

@classmethod
def supported_ecosystem(cls):
return {
"npm": "npm",
"maven": "maven",
"golang": "go",
"pypi": "pypi",
"cargo": "cargo",
# Coming soon
# "nuget": "nuget",
}


def parse_advisory(advisory) -> Iterable[VendorData]:
package = advisory["packages"][0]
affected_versions = [event["version"] for event in package["versionsAffected"]]
fixed_versions = [event["version"] for event in package["versionsUnaffected"]]
yield VendorData(
aliases=sorted(set(advisory["aliases"])),
affected_versions=sorted(set(affected_versions)),
fixed_versions=sorted(set(fixed_versions)),
)


def parse_advisories_from_meta(advisories_metadata):
advisories = []
dependencies = advisories_metadata.get("dependencies") or []
for dependency in dependencies:
advs = dependency.get("advisories") or []
advisories.extend(advs)
return advisories


def generate_advisory_payload(advisory_meta):
url_advisory = "https://deps.dev/_/advisory/{source}/{sourceID}"
return url_advisory.format(source=advisory_meta["source"], sourceID=advisory_meta["sourceID"])


def generate_meta_payload(purl):
url_advisories_meta = "https://deps.dev/_/s/{ecosystem}/p/{package}/v/{version}/dependencies"
supported_ecosystem = DepsDataSource.supported_ecosystem()
if purl.type in supported_ecosystem:
purl_version = purl.version
purl_name = purl.name

if purl.type == "maven":
if not purl.namespace:
logger.error(f"Invalid Maven PURL {str(purl)}")
return
purl_name = quote(f"{purl.namespace}:{purl.name}", safe="")

elif purl.type == "golang":
if purl.namespace:
purl_name = quote(f"{purl.namespace}/{purl.name}", safe="")
if not purl_version.startswith("v"):
purl_version = f"v{purl_version}"

return url_advisories_meta.format(
ecosystem=supported_ecosystem[purl.type],
package=purl_name,
version=purl_version,
)
140 changes: 140 additions & 0 deletions vulntotal/datasources/github.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import logging
from typing import Iterable

from vulnerabilities import utils
from vulntotal.validator import DataSource
from vulntotal.validator import VendorData
from vulntotal.vulntotal_utils import github_constraints_satisfied

logger = logging.getLogger(__name__)


class GithubDataSource(DataSource):
spdx_license_expression = "TODO"
license_url = "TODO"

def fetch_github(self, graphql_query):
return utils.fetch_github_graphql_query(graphql_query)

def datasource_advisory(self, purl) -> Iterable[VendorData]:
end_cursor = ""
interesting_edges = []
while True:
queryset = generate_graphql_payload(purl, end_cursor)
response = self.fetch_github(queryset)
self._raw_dump.append(response)
security_advisories = response["data"]["securityVulnerabilities"]
interesting_edges.extend(extract_interesting_edge(security_advisories["edges"], purl))
end_cursor = security_advisories["pageInfo"]["endCursor"]
if not security_advisories["pageInfo"]["hasNextPage"]:
break
return parse_advisory(interesting_edges)

@classmethod
def supported_ecosystem(cls):
return {
"maven": "MAVEN",
"nuget": "NUGET",
"composer": "COMPOSER",
"pypi": "PIP",
"gem": "RUBYGEMS",
"golang": "GO",
"rust": "RUST",
"npm": "NPM",
"erlang": "ERLANG",
}


def parse_advisory(interesting_edges) -> Iterable[VendorData]:
for edge in interesting_edges:
node = edge["node"]
aliases = [aliase["value"] for aliase in node["advisory"]["identifiers"]]
affected_versions = node["vulnerableVersionRange"].strip().replace(" ", "").split(",")
fixed_versions = [node["firstPatchedVersion"]["identifier"]]
yield VendorData(
aliases=sorted(list(set(aliases))),
affected_versions=sorted(list(set(affected_versions))),
fixed_versions=sorted(list(set(fixed_versions))),
)


def extract_interesting_edge(edges, purl):
interesting_edges = []
for edge in edges:
if github_constraints_satisfied(edge["node"]["vulnerableVersionRange"], purl.version):
interesting_edges.append(edge)
return interesting_edges


def generate_graphql_payload(purl, end_cursor):
GRAPHQL_QUERY_TEMPLATE = """
query{
securityVulnerabilities(first: 100, ecosystem: %s, package: "%s", %s){
edges {
node {
advisory {
identifiers {
type
value
}
summary
references {
url
}
severity
publishedAt
}
firstPatchedVersion{
identifier
}
package {
name
}
vulnerableVersionRange
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
"""

supported_ecosystem = GithubDataSource.supported_ecosystem()

if purl.type not in supported_ecosystem:
return

end_cursor_exp = ""
ecosystem = supported_ecosystem[purl.type]
package_name = purl.name

if end_cursor:
end_cursor_exp = f'after: "{end_cursor}"'

if purl.type == "maven":
if not purl.namespace:
logger.error(f"Invalid Maven PURL {str(purl)}")
return
package_name = f"{purl.namespace}:{purl.name}"

elif purl.type == "composer":
if not purl.namespace:
logger.error(f"Invalid Composer PURL {str(purl)}")
return
package_name = f"{purl.namespace}/{purl.name}"

elif purl.type == "golang" and purl.namespace:
package_name = f"{purl.namespace}/{purl.name}"

return {"query": GRAPHQL_QUERY_TEMPLATE % (ecosystem, package_name, end_cursor_exp)}
Loading

0 comments on commit bf49673

Please sign in to comment.