diff --git a/vulntotal/datasources/deps.py b/vulntotal/datasources/deps.py index 33c62773d..413c0d18b 100644 --- a/vulntotal/datasources/deps.py +++ b/vulntotal/datasources/deps.py @@ -26,12 +26,21 @@ class DepsDataSource(DataSource): def fetch_json_response(self, url): response = requests.get(url) - if not response.status_code == 200 or response.text == "Not Found": + if response.status_code != 200 or response.text == "Not Found": logger.error(f"Error while fetching {url}") return return response.json() def datasource_advisory(self, purl) -> Iterable[VendorData]: + """ + Fetch and parse advisories from a given purl. + + Parameters: + purl: A string representing the package URL. + + Returns: + A list of VendorData objects containing the advisory information. + """ payload = generate_meta_payload(purl) response = self.fetch_json_response(payload) if response: @@ -58,6 +67,16 @@ def supported_ecosystem(cls): def parse_advisory(advisory, purl) -> Iterable[VendorData]: + """ + Parse an advisory into a VendorData object. + + Parameters: + advisory: A dictionary representing the advisory data. + purl: PURL for the advisory. + + Yields: + VendorData instance containing purl, aliases, affected_versions and fixed_versions. + """ package = advisory["packages"][0] affected_versions = [event["version"] for event in package["versionsAffected"]] fixed_versions = [event["version"] for event in package["versionsUnaffected"]] @@ -70,6 +89,15 @@ def parse_advisory(advisory, purl) -> Iterable[VendorData]: def parse_advisories_from_meta(advisories_metadata): + """ + Parse advisories from a given metadata. + + Parameters: + advisories_metadata: A dictionary representing the metadata of the advisories. + + Returns: + A list of dictionaries, each representing an advisory. + """ advisories = [] dependencies = advisories_metadata.get("dependencies") or [] for dependency in dependencies: @@ -84,6 +112,15 @@ def generate_advisory_payload(advisory_meta): def generate_meta_payload(purl): + """ + Generate a payload for fetching advisories metadata from a given purl. + + Parameters: + purl: A PackageURL object representing the package URL. + + Returns: + A string representing the payload for fetching advisories metadata. It should be a valid URL that contains the ecosystem, package name and package version of the dependency. + """ url_advisories_meta = "https://deps.dev/_/s/{ecosystem}/p/{package}/v/{version}/dependencies" supported_ecosystem = DepsDataSource.supported_ecosystem() if purl.type in supported_ecosystem: diff --git a/vulntotal/datasources/github.py b/vulntotal/datasources/github.py index ab31aa71e..59ac65679 100644 --- a/vulntotal/datasources/github.py +++ b/vulntotal/datasources/github.py @@ -101,7 +101,7 @@ def parse_advisory(interesting_edges, purl) -> Iterable[VendorData]: """ for edge in interesting_edges: node = edge["node"] - aliases = [aliase["value"] for aliase in get_item(node, "advisory", "identifiers")] + aliases = [alias["value"] for alias in get_item(node, "advisory", "identifiers")] affected_versions = node["vulnerableVersionRange"].strip().replace(" ", "").split(",") parsed_fixed_versions = get_item(node, "firstPatchedVersion", "identifier") fixed_versions = [parsed_fixed_versions] if parsed_fixed_versions else [] diff --git a/vulntotal/datasources/gitlab.py b/vulntotal/datasources/gitlab.py index 55aaa5b99..ae805ab00 100644 --- a/vulntotal/datasources/gitlab.py +++ b/vulntotal/datasources/gitlab.py @@ -31,6 +31,15 @@ class GitlabDataSource(DataSource): license_url = "TODO" def datasource_advisory(self, purl) -> Iterable[VendorData]: + """ + Fetches advisories for a given purl from the GitLab API. + + Parameters: + purl: A PackageURL instance representing the package to query. + + Yields: + VendorData instance containing the advisory information for the package. + """ package_slug = get_package_slug(purl) location = download_subtree(package_slug, speculative_execution=True) if not location: @@ -60,6 +69,15 @@ def supported_ecosystem(cls): def get_package_slug(purl): + """ + Constructs a package slug from a given purl. + + Parameters: + purl: A PackageURL instance representing the package to query. + + Returns: + A string representing the package slug, or None if the purl type is not supported by GitLab. + """ supported_ecosystem = GitlabDataSource.supported_ecosystem() if purl.type not in supported_ecosystem: @@ -75,6 +93,16 @@ def get_package_slug(purl): def download_subtree(package_slug: str, speculative_execution=False): + """ + Downloads and extracts a tar file from a given package slug. + + Parameters: + package_slug: A string representing the package slug to query. + speculative_execution: A boolean indicating whether to log errors or not. + + Returns: + A Path object representing the extracted location, or None if an error occurs. + """ url = f"https://gitlab.com/gitlab-org/security-products/gemnasium-db/-/archive/master/gemnasium-db-master.tar.gz?path={package_slug}" response = fetch(url) if os.path.getsize(response.location) > 0: @@ -91,6 +119,12 @@ def download_subtree(package_slug: str, speculative_execution=False): def clear_download(location): + """ + Deletes a directory and its contents. + + Parameters: + location: A Path object representing the directory to delete. + """ if location: shutil.rmtree(location) @@ -133,9 +167,9 @@ def get_casesensitive_slug(path, package_slug): } ] url = "https://gitlab.com/api/graphql" - hasnext = True + has_next = True - while hasnext: + while has_next: response = requests.post(url, json=payload).json() paginated_tree = response[0]["data"]["project"]["repository"]["paginatedTree"] @@ -149,14 +183,26 @@ def get_casesensitive_slug(path, package_slug): return get_gitlab_style_slug(slug_flatpath, package_slug) payload[0]["variables"]["nextPageCursor"] = paginated_tree["pageInfo"]["endCursor"] - hasnext = paginated_tree["pageInfo"]["hasNextPage"] + has_next = paginated_tree["pageInfo"]["hasNextPage"] def parse_interesting_advisories(location, purl, delete_download=False) -> Iterable[VendorData]: + """ + Parses advisories from YAML files in a given location that match a given version. + + Parameters: + location: A Path object representing the location of the YAML files. + purl: PURL for the advisory. + version: A string representing the version to check against the affected range. + delete_download: A boolean indicating whether to delete the downloaded files after parsing. + + Yields: + VendorData instance containing the advisory information for the package. + """ version = purl.version path = Path(location) - glob = "**/*.yml" - files = (p for p in path.glob(glob) if p.is_file()) + pattern = "**/*.yml" + files = [p for p in path.glob(pattern) if p.is_file()] for file in sorted(files): with open(file) as f: gitlab_advisory = saneyaml.load(f) diff --git a/vulntotal/datasources/oss_index.py b/vulntotal/datasources/oss_index.py index d00454304..a18e65d21 100644 --- a/vulntotal/datasources/oss_index.py +++ b/vulntotal/datasources/oss_index.py @@ -27,6 +27,14 @@ class OSSDataSource(DataSource): api_authenticated = "https://ossindex.sonatype.org/api/v3/authorized/component-report" def fetch_json_response(self, coordinates): + """Fetch JSON response from OSS Index API for a given list of coordinates. + + Parameters: + coordinates: A list of strings representing the package coordinates. + + Returns: + A dictionary containing the JSON response from the OSS Index API, or None if the response is unsuccessful or an error occurs while fetching data. + """ username = os.environ.get("OSS_USERNAME", None) token = os.environ.get("OSS_TOKEN", None) auth = None @@ -35,20 +43,21 @@ def fetch_json_response(self, coordinates): auth = (username, token) url = self.api_authenticated response = requests.post(url, auth=auth, json={"coordinates": coordinates}) - - if response.status_code == 200: + try: + response.raise_for_status() return response.json() - elif response.status_code == 401: - logger.error("Invalid credentials") - elif response.status_code == 429: - msg = ( - "Too many requests" - if auth - else "Too many requests: add OSS_USERNAME and OSS_TOKEN in .env file" - ) - logger.error(msg) - else: - logger.error(f"unknown status code: {response.status_code} while fetching: {url}") + except requests.exceptions.HTTPError as e: + if e.response.status_code == 401: + logger.error("Invalid credentials") + elif e.response.status_code == 429: + msg = ( + "Too many requests" + if auth + else "Too many requests: add OSS_USERNAME and OSS_TOKEN in .env file" + ) + logger.error(msg) + else: + logger.error(f"Unknown status code: {e.response.status_code} while fetching: {url}") def datasource_advisory(self, purl) -> Iterable[VendorData]: if purl.type not in self.supported_ecosystem(): @@ -81,6 +90,16 @@ def supported_ecosystem(cls): def parse_advisory(component, purl) -> Iterable[VendorData]: + """ + Parse component from OSS Index API and yield VendorData. + + Parameters: + component: A list containing a dictionary with component details. + purl: PURL for the advisory. + + Yields: + VendorData instance containing advisory information for the component. + """ response = component[0] vulnerabilities = response.get("vulnerabilities") or [] for vuln in vulnerabilities: diff --git a/vulntotal/datasources/osv.py b/vulntotal/datasources/osv.py index e7360cc24..4adf6322c 100644 --- a/vulntotal/datasources/osv.py +++ b/vulntotal/datasources/osv.py @@ -27,11 +27,21 @@ class OSVDataSource(DataSource): url = "https://api.osv.dev/v1/query" def fetch_advisory(self, payload): - """Fetch JSON advisory from OSV API for a given package payload""" + """ + Fetch JSON advisory from OSV API for a given package payload + + Parameters: + payload: A dictionary representing the package data to query. + + Returns: + A JSON object containing the advisory information for the package, or None if an error occurs while fetching data from the OSV API. + """ response = requests.post(self.url, data=str(payload)) - if not response.status_code == 200: - logger.error(f"Error while fetching {payload}: {response.status_code}") + try: + response.raise_for_status() + except requests.exceptions.HTTPError as e: + logger.error(f"Error while fetching {payload}: {e}") return return response.json() @@ -66,6 +76,12 @@ def supported_ecosystem(cls): def parse_advisory(response, purl) -> Iterable[VendorData]: """ Parse response from OSV API and yield VendorData + + Parameters: + response: A JSON object containing the response data from the OSV API. + + Yields: + VendorData instance containing the advisory information for the package. """ for vuln in response.get("vulns") or []: @@ -79,17 +95,17 @@ def parse_advisory(response, purl) -> Iterable[VendorData]: try: affected_versions.extend(get_item(vuln, "affected", 0, "versions") or []) - except: - pass + except (KeyError, TypeError, IndexError) as e: + logger.error(f"Error while parsing affected versions: {e}") try: - for event in get_item(vuln, "affected", 0, "ranges", 0, "events") or []: - affected_versions.append(event.get("introduced")) if event.get( - "introduced" - ) else None - fixed.append(event.get("fixed")) if event.get("fixed") else None - except: - pass + events = get_item(vuln, "affected", 0, "ranges", 0, "events") or [] + affected_versions.extend( + [event.get("introduced") for event in events if event.get("introduced")] + ) + fixed.extend([event.get("fixed") for event in events if event.get("fixed")]) + except (KeyError, TypeError, IndexError) as e: + logger.error(f"Error while parsing events: {e}") yield VendorData( purl=PackageURL(purl.type, purl.namespace, purl.name), @@ -100,7 +116,15 @@ def parse_advisory(response, purl) -> Iterable[VendorData]: def generate_payload(purl): - """Generate compatible payload for OSV API from a PURL""" + """ + Generate compatible payload for OSV API from a PURL + + Parameters: + purl: A PackageURL instance representing the package to query. + + Returns: + A dictionary containing the package data compatible with the OSV API. + """ supported_ecosystem = OSVDataSource.supported_ecosystem() payload = {} diff --git a/vulntotal/datasources/snyk.py b/vulntotal/datasources/snyk.py index 4b1a173f8..4af4fa619 100644 --- a/vulntotal/datasources/snyk.py +++ b/vulntotal/datasources/snyk.py @@ -17,7 +17,7 @@ from vulntotal.validator import DataSource from vulntotal.validator import VendorData -from vulntotal.vulntotal_utils import snky_constraints_satisfied +from vulntotal.vulntotal_utils import snyk_constraints_satisfied logger = logging.getLogger(__name__) @@ -27,15 +27,36 @@ class SnykDataSource(DataSource): license_url = "TODO" def fetch(self, url): + """ + Fetch the content of a given URL. + + Parameters: + url: A string representing the URL to fetch. + + Returns: + A string of HTML or a dictionary of JSON if the response is successful, + or None if the response is unsuccessful. + """ response = requests.get(url) - if not response.status_code == 200: - logger.error(f"Error while fetching {url}") + try: + response.raise_for_status() + except requests.exceptions.HTTPError as e: + logger.error(f"Error while fetching {url}: {e}") return if response.headers["content-type"] == "application/json, charset=utf-8": return response.json() return response.text def datasource_advisory(self, purl) -> Iterable[VendorData]: + """ + Fetch advisories from Snyk for a given package. + + Parameters: + purl: A PackageURL instance representing the package. + + Yields: + VendorData instance containing advisory information. + """ package_advisory_url = generate_package_advisory_url(purl) package_advisories_list = self.fetch(package_advisory_url) self._raw_dump.append(package_advisories_list) @@ -68,6 +89,15 @@ def supported_ecosystem(cls): def generate_package_advisory_url(purl): + """ + Generate a URL for fetching advisories from Snyk for a given package. + + Parameters: + purl: A PackageURL instance representing the package. + + Returns: + A string containing the URL or None if the package is not supported by Snyk. + """ url_package_advisories = "https://security.snyk.io/package/{ecosystem}/{package}" # Pseudo API, unfortunately gives only 30 vulnerability per package, but this is the best we have for unmanaged packages @@ -103,13 +133,22 @@ def generate_package_advisory_url(purl): def extract_html_json_advisories(package_advisories): - vulnerablity = {} + """ + Extract vulnerability information from HTML or JSON advisories. + + Parameters: + package_advisories: A string of HTML or a dictionary of JSON containing advisories for a package. + + Returns: + A dictionary mapping vulnerability IDs to lists of affected versions for the package. + """ + vulnerability = {} # If advisories are json and is obtained through pseudo API if isinstance(package_advisories, dict): if package_advisories["status"] == "ok": for vuln in package_advisories["vulnerabilities"]: - vulnerablity[vuln["id"]] = vuln["semver"]["vulnerable"] + vulnerability[vuln["id"]] = vuln["semver"]["vulnerable"] else: soup = BeautifulSoup(package_advisories, "html.parser") vulns_table = soup.find("tbody", class_="vue--table__tbody") @@ -121,11 +160,23 @@ def extract_html_json_advisories(package_advisories): "span", class_="vue--chip vulnerable-versions__chip vue--chip--default" ) affected_versions = [vers.text.strip() for vers in ranges] - vulnerablity[anchor["href"].rsplit("/", 1)[-1]] = affected_versions - return vulnerablity + vulnerability[anchor["href"].rsplit("/", 1)[-1]] = affected_versions + return vulnerability def parse_html_advisory(advisory_html, snyk_id, affected, purl) -> VendorData: + """ + Parse HTML advisory from Snyk and extract vendor data. + + Parameters: + advisory_html: A string of HTML containing the advisory details. + snyk_id: A string representing the Snyk ID of the vulnerability. + affected: A list of strings representing the affected versions. + purl: PURL for the advisory. + + Returns: + A VendorData instance containing aliases, affected versions and fixed versions for the vulnerability. + """ aliases = [] fixed_versions = [] @@ -154,10 +205,7 @@ def parse_html_advisory(advisory_html, snyk_id, affected, purl) -> VendorData: def is_purl_in_affected(version, affected): - for affected_range in affected: - if snky_constraints_satisfied(affected_range, version): - return True - return False + return any(snyk_constraints_satisfied(affected_range, version) for affected_range in affected) def generate_advisory_payload(snyk_id): diff --git a/vulntotal/datasources/vulnerablecode.py b/vulntotal/datasources/vulnerablecode.py index 7fa4c2709..d0122db83 100644 --- a/vulntotal/datasources/vulnerablecode.py +++ b/vulntotal/datasources/vulnerablecode.py @@ -30,22 +30,49 @@ class VulnerableCodeDataSource(DataSource): vc_purl_search_api_path = "api/packages/bulk_search/" def fetch_post_json(self, payload): + """ + Fetches JSON data from the VulnerableCode API using a POST request with a given payload. + + Parameters: + payload: A dictionary representing the data to send in the request body. + + Returns: + A JSON object containing the response data, or None if an error occurs while fetching data from the VulnerableCode API. + """ url = urljoin(self.global_instance, self.vc_purl_search_api_path) response = fetch_vulnerablecode_query(url=url, payload=payload) - if not response.status_code == 200: + if response.status_code != 200: logger.error(f"Error while fetching {url}") return return response.json() def fetch_get_json(self, url): + """ + Fetches JSON data from a given URL using the VulnerableCode API. + + Parameters: + url: A string representing the URL to query. + + Returns: + A JSON object containing the response data, or None if an error occurs while fetching data from the URL. + """ response = fetch_vulnerablecode_query(url=url, payload=None) - if not response.status_code == 200: + if response.status_code != 200: logger.error(f"Error while fetching {url}") return return response.json() def datasource_advisory(self, purl) -> Iterable[VendorData]: - if purl.type not in self.supported_ecosystem() or not purl.version: + """ + Fetches advisories for a given purl from the VulnerableCode API. + + Parameters: + purl: A PackageURL instance representing the package to query. + + Yields: + VendorData instance containing the advisory information for the package. + """ + if purl.type not in self.supported_ecosystem() or purl.version is None: return metadata_advisories = self.fetch_post_json({"purls": [str(purl)]}) self._raw_dump.append(metadata_advisories) @@ -101,10 +128,10 @@ class VCIOTokenError(Exception): def fetch_vulnerablecode_query(url: str, payload: dict): """ Requires VCIO API key in .env file - For example:: - - VCIO_TOKEN="OJ78Os2IPfM80hqVT2ek+1QnrTKvsX1HdOMABq3pmQd" + For example: + VCIO_TOKEN='OJ78Os2IPfM80hqVT2ek+1QnrTKvsX1HdOMABq3pmQd' """ + load_dotenv() vcio_token = os.environ.get("VCIO_TOKEN", None) if not vcio_token: diff --git a/vulntotal/vulntotal_cli.py b/vulntotal/vulntotal_cli.py index 4106b0f98..c65007ba8 100755 --- a/vulntotal/vulntotal_cli.py +++ b/vulntotal/vulntotal_cli.py @@ -304,22 +304,22 @@ def prettyprint(purl, datasources, pagination, no_threading): def group_by_cve(vulnerabilities): grouped_by_cve = {} - nocve = [] - noadvisory = [] + no_cve = [] + no_advisory = [] for datasource, advisories in vulnerabilities.items(): if not advisories: - noadvisory.append([datasource.upper(), "", "", ""]) + no_advisory.append([datasource.upper(), "", "", ""]) for advisory in advisories: cve = next((x for x in advisory.aliases if x.startswith("CVE")), None) if not cve: - nocve.append(formatted_row(datasource, advisory)) + no_cve.append(formatted_row(datasource, advisory)) continue if cve not in grouped_by_cve: grouped_by_cve[cve] = [] grouped_by_cve[cve].append(formatted_row(datasource, advisory)) - grouped_by_cve["NOCVE"] = nocve - grouped_by_cve["NOADVISORY"] = noadvisory + grouped_by_cve["NOCVE"] = no_cve + grouped_by_cve["NOADVISORY"] = no_advisory return grouped_by_cve diff --git a/vulntotal/vulntotal_utils.py b/vulntotal/vulntotal_utils.py index 787ff9d92..79d866e05 100644 --- a/vulntotal/vulntotal_utils.py +++ b/vulntotal/vulntotal_utils.py @@ -80,14 +80,14 @@ def parse_constraint(constraint): return constraint[-1], constraint[:-1] -def github_constraints_satisfied(github_constrain, version): +def github_constraints_satisfied(github_constraint, version): """ Return True or False depending on whether the given version satisfies the github constraint For example: >>> assert github_constraints_satisfied(">= 7.0.0, <= 7.6.57", "7.1.1") == True >>> assert github_constraints_satisfied(">= 10.4.0, <= 10.4.1", "10.6.0") == False """ - gh_constraints = github_constrain.strip().replace(" ", "") + gh_constraints = github_constraint.strip().replace(" ", "") constraints = gh_constraints.split(",") for constraint in constraints: gh_comparator, gh_version = parse_constraint(constraint) @@ -98,15 +98,15 @@ def github_constraints_satisfied(github_constrain, version): return True -def snky_constraints_satisfied(snyk_constrain, version): +def snyk_constraints_satisfied(snyk_constraint, version): """ Return True or False depending on whether the given version satisfies the snyk constraint For example: - >>> assert snky_constraints_satisfied(">=4.0.0, <4.0.10.16", "4.0.10.15") == True - >>> assert snky_constraints_satisfied(" >=4.1.0, <4.4.15.7", "4.0.10.15") == False - >>> assert snky_constraints_satisfied("[3.0.0,3.1.25)", "3.0.2") == True + >>> assert snyk_constraints_satisfied(">=4.0.0, <4.0.10.16", "4.0.10.15") == True + >>> assert snyk_constraints_satisfied(" >=4.1.0, <4.4.15.7", "4.0.10.15") == False + >>> assert snyk_constraints_satisfied("[3.0.0,3.1.25)", "3.0.2") == True """ - snyk_constraints = snyk_constrain.strip().replace(" ", "") + snyk_constraints = snyk_constraint.strip().replace(" ", "") constraints = snyk_constraints.split(",") for constraint in constraints: snyk_comparator, snyk_version = parse_constraint(constraint) @@ -117,7 +117,7 @@ def snky_constraints_satisfied(snyk_constrain, version): return True -def gitlab_constraints_satisfied(gitlab_constrain, version): +def gitlab_constraints_satisfied(gitlab_constraint, version): """ Return True or False depending on whether the given version satisfies the gitlab constraint For example: @@ -128,7 +128,7 @@ def gitlab_constraints_satisfied(gitlab_constrain, version): >>> assert gitlab_constraints_satisfied( ">=1.5,<1.5.2", "2.2") == False """ - gitlab_constraints = gitlab_constrain.strip() + gitlab_constraints = gitlab_constraint.strip() if gitlab_constraints.startswith(("[", "(")): # transform "[7.0.0,7.0.11),[7.2.0,7.2.4)" -> [ "[7.0.0,7.0.11)", "[7.2.0,7.2.4)" ] splitted = gitlab_constraints.split(",") @@ -144,10 +144,10 @@ def gitlab_constraints_satisfied(gitlab_constrain, version): for constraint in constraints: is_constraint_satisfied = True - for subcontraint in constraint.strip().split(delimiter): - if not subcontraint: + for subconstraint in constraint.strip().split(delimiter): + if not subconstraint: continue - gitlab_comparator, gitlab_version = parse_constraint(subcontraint.strip()) + gitlab_comparator, gitlab_version = parse_constraint(subconstraint.strip()) if not gitlab_version: continue if not compare(