Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved code style and fixed typos #1157

Merged
merged 14 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion vulntotal/datasources/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,21 @@ class DepsDataSource(DataSource):

def fetch_json_response(self, url):
response = requests.get(url)
if not response.status_code == 200 or response.text == "Not Found":
if response.status_code != 200 or response.text == "Not Found":
logger.error(f"Error while fetching {url}")
return
return response.json()

def datasource_advisory(self, purl) -> Iterable[VendorData]:
"""
Fetch and parse advisories from a given purl.

Parameters:
purl: A string representing the package URL.

Returns:
A list of VendorData objects containing the advisory information.
"""
payload = generate_meta_payload(purl)
response = self.fetch_json_response(payload)
if response:
Expand All @@ -57,6 +66,15 @@ def supported_ecosystem(cls):


def parse_advisory(advisory) -> Iterable[VendorData]:
"""
Parse an advisory into a VendorData object.

Parameters:
advisory: A dictionary representing the advisory data.

Yields:
VendorData instance containing purl, aliases, affected_versions and fixed_versions.
"""
package = advisory["packages"][0]
affected_versions = [event["version"] for event in package["versionsAffected"]]
fixed_versions = [event["version"] for event in package["versionsUnaffected"]]
Expand All @@ -68,6 +86,15 @@ def parse_advisory(advisory) -> Iterable[VendorData]:


def parse_advisories_from_meta(advisories_metadata):
"""
Parse advisories from a given metadata.

Parameters:
advisories_metadata: A dictionary representing the metadata of the advisories.

Returns:
A list of dictionaries, each representing an advisory.
"""
advisories = []
dependencies = advisories_metadata.get("dependencies") or []
for dependency in dependencies:
Expand All @@ -82,6 +109,15 @@ def generate_advisory_payload(advisory_meta):


def generate_meta_payload(purl):
"""
Generate a payload for fetching advisories metadata from a given purl.

Parameters:
purl: A PackageURL object representing the package URL.

Returns:
A string representing the payload for fetching advisories metadata. It should be a valid URL that contains the ecosystem, package name and package version of the dependency.
"""
url_advisories_meta = "https://deps.dev/_/s/{ecosystem}/p/{package}/v/{version}/dependencies"
supported_ecosystem = DepsDataSource.supported_ecosystem()
if purl.type in supported_ecosystem:
Expand Down
2 changes: 1 addition & 1 deletion vulntotal/datasources/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def supported_ecosystem(cls):
def parse_advisory(interesting_edges) -> Iterable[VendorData]:
for edge in interesting_edges:
node = edge["node"]
aliases = [aliase["value"] for aliase in get_item(node, "advisory", "identifiers")]
aliases = [alias["value"] for alias in get_item(node, "advisory", "identifiers")]
affected_versions = node["vulnerableVersionRange"].strip().replace(" ", "").split(",")
parsed_fixed_versions = get_item(node, "firstPatchedVersion", "identifier")
fixed_versions = [parsed_fixed_versions] if parsed_fixed_versions else []
Expand Down
55 changes: 50 additions & 5 deletions vulntotal/datasources/gitlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ class GitlabDataSource(DataSource):
license_url = "TODO"

def datasource_advisory(self, purl) -> Iterable[VendorData]:
"""
Fetches advisories for a given purl from the GitLab API.

Parameters:
purl: A PackageURL instance representing the package to query.

Yields:
VendorData instance containing the advisory information for the package.
"""
package_slug = get_package_slug(purl)
location = download_subtree(package_slug, speculative_execution=True)
if not location:
Expand Down Expand Up @@ -59,6 +68,15 @@ def supported_ecosystem(cls):


def get_package_slug(purl):
"""
Constructs a package slug from a given purl.

Parameters:
purl: A PackageURL instance representing the package to query.

Returns:
A string representing the package slug, or None if the purl type is not supported by GitLab.
"""
supported_ecosystem = GitlabDataSource.supported_ecosystem()

if purl.type not in supported_ecosystem:
Expand All @@ -74,6 +92,16 @@ def get_package_slug(purl):


def download_subtree(package_slug: str, speculative_execution=False):
"""
Downloads and extracts a tar file from a given package slug.

Parameters:
package_slug: A string representing the package slug to query.
speculative_execution: A boolean indicating whether to log errors or not.

Returns:
A Path object representing the extracted location, or None if an error occurs.
"""
url = f"https://gitlab.com/gitlab-org/security-products/gemnasium-db/-/archive/master/gemnasium-db-master.tar.gz?path={package_slug}"
response = fetch(url)
if os.path.getsize(response.location) > 0:
Expand All @@ -90,6 +118,12 @@ def download_subtree(package_slug: str, speculative_execution=False):


def clear_download(location):
"""
Deletes a directory and its contents.

Parameters:
location: A Path object representing the directory to delete.
"""
if location:
shutil.rmtree(location)

Expand Down Expand Up @@ -132,9 +166,9 @@ def get_casesensitive_slug(path, package_slug):
}
]
url = "https://gitlab.com/api/graphql"
hasnext = True
has_next = True

while hasnext:
while has_next:
response = requests.post(url, json=payload).json()
paginated_tree = response[0]["data"]["project"]["repository"]["paginatedTree"]

Expand All @@ -148,13 +182,24 @@ def get_casesensitive_slug(path, package_slug):
return get_gitlab_style_slug(slug_flatpath, package_slug)

payload[0]["variables"]["nextPageCursor"] = paginated_tree["pageInfo"]["endCursor"]
hasnext = paginated_tree["pageInfo"]["hasNextPage"]
has_next = paginated_tree["pageInfo"]["hasNextPage"]


def parse_interesting_advisories(location, version, delete_download=False) -> Iterable[VendorData]:
"""
Parses advisories from YAML files in a given location that match a given version.

Parameters:
location: A Path object representing the location of the YAML files.
version: A string representing the version to check against the affected range.
delete_download: A boolean indicating whether to delete the downloaded files after parsing.

Yields:
VendorData instance containing the advisory information for the package.
"""
path = Path(location)
glob = "**/*.yml"
files = (p for p in path.glob(glob) if p.is_file())
pattern = "**/*.yml"
files = [p for p in path.glob(pattern) if p.is_file()]
for file in sorted(files):
with open(file) as f:
gitlab_advisory = saneyaml.load(f)
Expand Down
44 changes: 31 additions & 13 deletions vulntotal/datasources/oss_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ class OSSDataSource(DataSource):
api_authenticated = "https://ossindex.sonatype.org/api/v3/authorized/component-report"

def fetch_json_response(self, coordinates):
"""Fetch JSON response from OSS Index API for a given list of coordinates.

Parameters:
coordinates: A list of strings representing the package coordinates.

Returns:
A dictionary containing the JSON response from the OSS Index API, or None if the response is unsuccessful or an error occurs while fetching data.
"""
username = os.environ.get("OSS_USERNAME", None)
token = os.environ.get("OSS_TOKEN", None)
auth = None
Expand All @@ -34,20 +42,21 @@ def fetch_json_response(self, coordinates):
auth = (username, token)
url = self.api_authenticated
response = requests.post(url, auth=auth, json={"coordinates": coordinates})

if response.status_code == 200:
try:
response.raise_for_status()
return response.json()
elif response.status_code == 401:
logger.error("Invalid credentials")
elif response.status_code == 429:
msg = (
"Too many requests"
if auth
else "Too many requests: add OSS_USERNAME and OSS_TOKEN in .env file"
)
logger.error(msg)
else:
logger.error(f"unknown status code: {response.status_code} while fetching: {url}")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
logger.error("Invalid credentials")
elif e.response.status_code == 429:
msg = (
"Too many requests"
if auth
else "Too many requests: add OSS_USERNAME and OSS_TOKEN in .env file"
)
logger.error(msg)
else:
logger.error(f"Unknown status code: {e.response.status_code} while fetching: {url}")

def datasource_advisory(self, purl) -> Iterable[VendorData]:
if purl.type not in self.supported_ecosystem():
Expand Down Expand Up @@ -80,6 +89,15 @@ def supported_ecosystem(cls):


def parse_advisory(component) -> Iterable[VendorData]:
"""
Parse component from OSS Index API and yield VendorData.

Parameters:
component: A list containing a dictionary with component details.

Yields:
VendorData instance containing advisory information for the component.
"""
response = component[0]
vulnerabilities = response.get("vulnerabilities") or []
for vuln in vulnerabilities:
Expand Down
50 changes: 37 additions & 13 deletions vulntotal/datasources/osv.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,21 @@ class OSVDataSource(DataSource):
url = "https://api.osv.dev/v1/query"

def fetch_advisory(self, payload):
"""Fetch JSON advisory from OSV API for a given package payload"""
"""
Fetch JSON advisory from OSV API for a given package payload

Parameters:
payload: A dictionary representing the package data to query.

Returns:
A JSON object containing the advisory information for the package, or None if an error occurs while fetching data from the OSV API.
"""

response = requests.post(self.url, data=str(payload))
if not response.status_code == 200:
logger.error(f"Error while fetching {payload}: {response.status_code}")
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.error(f"Error while fetching {payload}: {e}")
return
return response.json()

Expand Down Expand Up @@ -65,6 +75,12 @@ def supported_ecosystem(cls):
def parse_advisory(response) -> Iterable[VendorData]:
"""
Parse response from OSV API and yield VendorData

Parameters:
response: A JSON object containing the response data from the OSV API.

Yields:
VendorData instance containing the advisory information for the package.
"""

for vuln in response.get("vulns") or []:
Expand All @@ -78,17 +94,17 @@ def parse_advisory(response) -> Iterable[VendorData]:

try:
affected_versions.extend(get_item(vuln, "affected", 0, "versions") or [])
except:
pass
except KeyError as e:
keshav-space marked this conversation as resolved.
Show resolved Hide resolved
logger.error(f"Error while parsing affected versions: {e}")

try:
for event in get_item(vuln, "affected", 0, "ranges", 0, "events") or []:
affected_versions.append(event.get("introduced")) if event.get(
"introduced"
) else None
fixed.append(event.get("fixed")) if event.get("fixed") else None
except:
pass
events = get_item(vuln, "affected", 0, "ranges", 0, "events") or []
affected_versions.extend(
[event.get("introduced") for event in events if event.get("introduced")]
)
fixed.extend([event.get("fixed") for event in events if event.get("fixed")])
except KeyError as e:
keshav-space marked this conversation as resolved.
Show resolved Hide resolved
logger.error(f"Error while parsing events: {e}")

yield VendorData(
aliases=sorted(list(set(aliases))),
Expand All @@ -98,7 +114,15 @@ def parse_advisory(response) -> Iterable[VendorData]:


def generate_payload(purl):
"""Generate compatible payload for OSV API from a PURL"""
"""
Generate compatible payload for OSV API from a PURL

Parameters:
purl: A PackageURL instance representing the package to query.

Returns:
A dictionary containing the package data compatible with the OSV API.
"""

supported_ecosystem = OSVDataSource.supported_ecosystem()
payload = {}
Expand Down
Loading