-
Notifications
You must be signed in to change notification settings - Fork 201
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1559 from aboutcode-org/1509-pypa-importer-pipeline
Add base pipeline for importers and migrate PyPa importer to aboutcode pipeline
- Loading branch information
Showing
13 changed files
with
523 additions
and
79 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
# | ||
# Copyright (c) nexB Inc. and others. All rights reserved. | ||
# VulnerableCode is a trademark of nexB Inc. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. | ||
# See https://github.com/nexB/vulnerablecode for support or download. | ||
# See https://aboutcode.org for more information about nexB OSS projects. | ||
# | ||
import logging | ||
from pathlib import Path | ||
from typing import Iterable | ||
|
||
import saneyaml | ||
from fetchcode.vcs import fetch_via_vcs | ||
|
||
from vulnerabilities.importer import AdvisoryData | ||
from vulnerabilities.importers.osv import parse_advisory_data | ||
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline | ||
from vulnerabilities.utils import get_advisory_url | ||
|
||
module_logger = logging.getLogger(__name__) | ||
|
||
|
||
class PyPaImporterPipeline(VulnerableCodeBaseImporterPipeline): | ||
"""Collect advisories from PyPA GitHub repository.""" | ||
|
||
spdx_license_expression = "CC-BY-4.0" | ||
license_url = "https://github.com/pypa/advisory-database/blob/main/LICENSE" | ||
repo_url = "git+https://github.com/pypa/advisory-database" | ||
importer_name = "Pypa Importer" | ||
|
||
@classmethod | ||
def steps(cls): | ||
return ( | ||
cls.clone, | ||
cls.collect_and_store_advisories, | ||
cls.import_new_advisories, | ||
cls.clean_downloads, | ||
) | ||
|
||
def clone(self): | ||
self.log(f"Cloning `{self.repo_url}`") | ||
self.vcs_response = fetch_via_vcs(self.repo_url) | ||
|
||
def advisories_count(self): | ||
vulns_directory = Path(self.vcs_response.dest_dir) / "vulns" | ||
return sum(1 for _ in vulns_directory.rglob("*.yaml")) | ||
|
||
def collect_advisories(self) -> Iterable[AdvisoryData]: | ||
base_directory = Path(self.vcs_response.dest_dir) | ||
vulns_directory = base_directory / "vulns" | ||
self.advisories_count = sum(1 for _ in vulns_directory.rglob("*.yaml")) | ||
|
||
for advisory in vulns_directory.rglob("*.yaml"): | ||
advisory_url = get_advisory_url( | ||
file=advisory, | ||
base_path=base_directory, | ||
url="https://github.com/pypa/advisory-database/blob/main/", | ||
) | ||
advisory_dict = saneyaml.load(advisory.read_text()) | ||
yield parse_advisory_data( | ||
raw_data=advisory_dict, | ||
supported_ecosystems=["pypi"], | ||
advisory_url=advisory_url, | ||
) | ||
|
||
def clean_downloads(self): | ||
if self.vcs_response: | ||
self.log(f"Removing cloned repository") | ||
self.vcs_response.delete() |
Oops, something went wrong.