diff --git a/vulnerabilities/pipelines/__init__.py b/vulnerabilities/pipelines/__init__.py index d2f4453d5..3dd1b8e73 100644 --- a/vulnerabilities/pipelines/__init__.py +++ b/vulnerabilities/pipelines/__init__.py @@ -10,7 +10,6 @@ import logging from datetime import datetime from datetime import timezone -from traceback import format_exc as traceback_format_exc from typing import Iterable from aboutcode.pipeline import BasePipeline @@ -18,8 +17,8 @@ from vulnerabilities.importer import AdvisoryData from vulnerabilities.improver import MAX_CONFIDENCE -from vulnerabilities.pipes.importer import import_advisory -from vulnerabilities.pipes.importer import insert_advisory +from vulnerabilities.models import Advisory +from vulnerabilities.pipes import advisory from vulnerabilities.utils import classproperty module_logger = logging.getLogger(__name__) @@ -83,38 +82,40 @@ def advisories_count(self) -> int: raise NotImplementedError def collect_and_store_advisories(self): - self.new_advisories = [] - collected_advisory_count = 0 progress = LoopProgress(total_iterations=self.advisories_count(), logger=self.log) for advisory in progress.iter(self.collect_advisories()): - new_advisory = insert_advisory( + if _obj := advisory.insert_advisory( advisory=advisory, pipeline_name=self.qualified_name, logger=self.log, - ) - if new_advisory: - self.new_advisories.append(new_advisory) - collected_advisory_count += 1 + ): + collected_advisory_count += 1 self.log(f"Successfully collected {collected_advisory_count:,d} advisories") def import_new_advisories(self): - new_advisories_count = len(self.new_advisories) + new_advisories = Advisory.objects.filter( + created_by=self.qualified_name, + date_imported__isnull=True, + ) + + new_advisories_count = new_advisories.count() + + self.log(f"Importing {new_advisories_count:,d} new advisories") imported_advisory_count = 0 progress = LoopProgress(total_iterations=new_advisories_count, logger=self.log) - for advisory in progress.iter(self.new_advisories): + for advisory in progress.iter(new_advisories.paginated()): self.import_advisory(advisory=advisory) - imported_advisory_count += 1 + if advisory.date_imported: + imported_advisory_count += 1 self.log(f"Successfully imported {imported_advisory_count:,d} new advisories") - def import_advisory(self, advisory) -> None: - if advisory.date_imported: - return + def import_advisory(self, advisory: Advisory) -> int: try: - import_advisory( + advisory.import_advisory( advisory=advisory, pipeline_name=self.qualified_name, confidence=self.advisory_confidence, diff --git a/vulnerabilities/pipes/importer.py b/vulnerabilities/pipes/advisory.py similarity index 94% rename from vulnerabilities/pipes/importer.py rename to vulnerabilities/pipes/advisory.py index a040b4850..4b264481c 100644 --- a/vulnerabilities/pipes/importer.py +++ b/vulnerabilities/pipes/advisory.py @@ -26,28 +26,30 @@ from vulnerabilities.models import Weakness -def insert_advisory(advisory: AdvisoryData, pipeline_name: str, logger: Callable): +def insert_advisory(advisory: AdvisoryData, pipeline_name: str, logger: Callable = None): + obj = None try: - obj, created = Advisory.objects.get_or_create( + obj, _ = Advisory.objects.get_or_create( aliases=advisory.aliases, summary=advisory.summary, affected_packages=[pkg.to_dict() for pkg in advisory.affected_packages], references=[ref.to_dict() for ref in advisory.references], date_published=advisory.date_published, weaknesses=advisory.weaknesses, + url=advisory.url, defaults={ "created_by": pipeline_name, "date_collected": datetime.now(timezone.utc), }, - url=advisory.url, ) - if created: - return obj except Exception as e: - logger( - f"Error while processing {advisory!r} with aliases {advisory.aliases!r}: {e!r} \n {traceback_format_exc()}", - level=logging.ERROR, - ) + if logger: + logger( + f"Error while processing {advisory!r} with aliases {advisory.aliases!r}: {e!r} \n {traceback_format_exc()}", + level=logging.ERROR, + ) + + return obj @transaction.atomic