Skip to content

Commit

Permalink
Make VulnerabilityReference.url unique #818
Browse files Browse the repository at this point in the history
Also validate full_clean in the improve_runner to ensure we do not
have empty, invalid or blank URLs.

Refactor code to add new Manager to VulnerabilityReference and Package
Add convenience method accordingly to create Pckage from purls

Reference: #818
Co-authored-by: Tushar Goel <[email protected]>
Signed-off-by: Philippe Ombredanne <[email protected]>
  • Loading branch information
pombredanne and TG1999 committed Sep 9, 2022
1 parent caa7268 commit 6d379d0
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 53 deletions.
111 changes: 64 additions & 47 deletions vulnerabilities/improve_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@
from datetime import datetime
from datetime import timezone
from typing import List
from typing import Tuple

from django.core.exceptions import ValidationError
from django.db import transaction

from vulnerabilities import models
from vulnerabilities.importer import PackageURL
from vulnerabilities.improver import Inference
from vulnerabilities.models import Advisory
from vulnerabilities.models import Alias
from vulnerabilities.models import Package
from vulnerabilities.models import PackageRelatedVulnerability
from vulnerabilities.models import Vulnerability
from vulnerabilities.models import VulnerabilityReference
from vulnerabilities.models import VulnerabilityRelatedReference
from vulnerabilities.models import VulnerabilitySeverity

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -63,46 +68,59 @@ def process_inferences(inferences: List[Inference], advisory: Advisory, improver
logger.info(f"Improving advisory id: {advisory.id}")

for inference in inferences:
vuln = get_or_create_vulnerability_and_aliases(
inference.vulnerability_id, inference.aliases, inference.summary
vulnerability = get_or_create_vulnerability_and_aliases(
vulnerability_id=inference.vulnerability_id,
alias_names=inference.aliases,
summary=inference.summary,
)
if not vuln:

if not vulnerability:
logger.warn(f"Unable to get vulnerability for inference: {inference!r}")
continue

for ref in inference.references:
reference, _ = models.VulnerabilityReference.objects.get_or_create(
reference_id=ref.reference_id, url=ref.url

reference = VulnerabilityReference.objects.get_or_none(
reference_id=ref.reference_id,
url=ref.url,
)

models.VulnerabilityRelatedReference.objects.update_or_create(
reference=reference, vulnerability=vuln
if not reference:
reference = create_valid_vulnerability_reference(
reference_id=ref.reference_id,
url=ref.url,
)
if not reference:
continue

VulnerabilityRelatedReference.objects.update_or_create(
reference=reference,
vulnerability=vulnerability,
)

for severity in ref.severities:
_vs, updated = models.VulnerabilitySeverity.objects.update_or_create(
_vs, updated = VulnerabilitySeverity.objects.update_or_create(
scoring_system=severity.system.identifier,
reference=reference,
defaults={"value": str(severity.value)},
)
if updated:
logger.info(f"Severity updated for reference {ref!r} to {severity.value!r}")

if inference.affected_purls:
for pkg in inference.affected_purls:
vulnerable_package, _ = _get_or_create_package(pkg)
models.PackageRelatedVulnerability(
vulnerability=vuln,
package=vulnerable_package,
created_by=improver_name,
confidence=inference.confidence,
fix=False,
).update_or_create()
for affected_purl in inference.affected_purls or []:
vulnerable_package = Package.objects.get_or_create_from_purl(purl=affected_purl)
PackageRelatedVulnerability(
vulnerability=vulnerability,
package=vulnerable_package,
created_by=improver_name,
confidence=inference.confidence,
fix=False,
).update_or_create()

if inference.fixed_purl:
fixed_package, _ = _get_or_create_package(inference.fixed_purl)
models.PackageRelatedVulnerability(
vulnerability=vuln,
fixed_package = Package.objects.get_or_create_from_purl(purl=inference.fixed_purl)
PackageRelatedVulnerability(
vulnerability=vulnerability,
package=fixed_package,
created_by=improver_name,
confidence=inference.confidence,
Expand All @@ -113,26 +131,25 @@ def process_inferences(inferences: List[Inference], advisory: Advisory, improver
advisory.save()


def _get_or_create_package(p: PackageURL) -> Tuple[models.Package, bool]:
query_kwargs = {}
# TODO: this should be revisited as this should best be a model or manager method... and possibly streamlined
query_kwargs = dict(
type=p.type or "",
namespace=p.namespace or "",
name=p.name or "",
version=p.version or "",
qualifiers=p.qualifiers or {},
subpath=p.subpath or "",
def create_valid_vulnerability_reference(url, reference_id=None):
"""
Create and return a new validated VulnerabilityReference from a
``url`` and ``reference_id``.
Return None and log a warning if this is not a valid reference.
"""
reference = VulnerabilityReference(
reference_id=reference_id,
url=url,
)

return models.Package.objects.get_or_create(**query_kwargs)

try:
reference.full_clean()
except ValidationError as e:
logger.warning(f"Invalid vulnerability reference: {reference!r}: {e}")
return

def _package_url_to_package(purl: PackageURL) -> models.Package:
# FIXME: this is is likely creating a package from a purl?
p = models.Package()
p.set_package_url(purl)
return p
reference.save()
return reference


def get_or_create_vulnerability_and_aliases(vulnerability_id, alias_names, summary):
Expand All @@ -145,9 +162,9 @@ def get_or_create_vulnerability_and_aliases(vulnerability_id, alias_names, summa
new_alias_names = set()
for alias_name in alias_names:
try:
alias = models.Alias.objects.get(alias=alias_name)
alias = Alias.objects.get(alias=alias_name)
existing_vulns.add(alias.vulnerability)
except models.Alias.DoesNotExist:
except Alias.DoesNotExist:
new_alias_names.add(alias_name)

# If given set of aliases point to different vulnerabilities in the
Expand Down Expand Up @@ -179,14 +196,14 @@ def get_or_create_vulnerability_and_aliases(vulnerability_id, alias_names, summa
vulnerability = existing_alias_vuln
elif vulnerability_id:
try:
vulnerability = models.Vulnerability.objects.get(vulnerability_id=vulnerability_id)
except models.Vulnerability.DoesNotExist:
vulnerability = Vulnerability.objects.get(vulnerability_id=vulnerability_id)
except Vulnerability.DoesNotExist:
logger.warn(
f"Given vulnerability_id: {vulnerability_id} does not exist in the database"
)
return
else:
vulnerability = models.Vulnerability(summary=summary)
vulnerability = Vulnerability(summary=summary)
vulnerability.save()

if summary and summary != vulnerability.summary:
Expand All @@ -196,7 +213,7 @@ def get_or_create_vulnerability_and_aliases(vulnerability_id, alias_names, summa
)

for alias_name in new_alias_names:
alias = models.Alias(alias=alias_name, vulnerability=vulnerability)
alias = Alias(alias=alias_name, vulnerability=vulnerability)
alias.save()
logger.info(f"New alias for {vulnerability!r}: {alias_name}")

Expand Down
41 changes: 41 additions & 0 deletions vulnerabilities/migrations/0025_remove_duplicate_reference_urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from django.db import migrations
from django.db.models import Count
from django.db.models import Max


class Migration(migrations.Migration):

dependencies = [
('vulnerabilities', '0024_alter_all_models_to_add_ordering'),
]

def remove_duplicate_reference_urls(apps, _):
"""
Find all duplicate references and remove all of them except for one.
Any duplication will be reprocessed by reimports if needed to correct
trhe relationships.
"""

VulnerabilityReference = apps.get_model("vulnerabilities", "VulnerabilityReference")

duplicates = (
VulnerabilityReference.objects.values("url")
.order_by("url")
.annotate(max_id=Max("id"), count_id=Count("id"))
.filter(count_id__gt=1)
)

for duplicate in duplicates:
# Get all rows with the same url,
# exclude the latest one
# and delete rest of them
(
VulnerabilityReference.objects
.filter(url=duplicate["url"])
.exclude(id=duplicate["max_id"])
.delete()
)

operations = [
migrations.RunPython(remove_duplicate_reference_urls, migrations.RunPython.noop),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generated by Django 4.0.7 on 2022-09-09 12:34

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('vulnerabilities', '0025_remove_duplicate_reference_urls'),
]

operations = [
migrations.AlterUniqueTogether(
name='vulnerabilityreference',
unique_together=set(),
),
migrations.AlterField(
model_name='vulnerabilityreference',
name='url',
field=models.URLField(help_text='URL to the vulnerability reference', max_length=1024, unique=True),
),
]
58 changes: 56 additions & 2 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@
import hashlib
import json
import logging
from contextlib import suppress

from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.validators import MaxValueValidator
from django.core.validators import MinValueValidator
from django.db import models
from django.db.models.functions import Length
from django.db.models.functions import Trim
from django.dispatch import receiver
from django.urls import reverse
from packageurl import PackageURL
from packageurl.contrib.django.models import PackageURLMixin
from packageurl.contrib.django.models import PackageURLQuerySet
from packageurl.contrib.django.models import without_empty_values
from rest_framework.authtoken.models import Token

from vulnerabilities.importer import AdvisoryData
Expand All @@ -29,6 +36,18 @@

logger = logging.getLogger(__name__)

models.CharField.register_lookup(Length)
models.CharField.register_lookup(Trim)


class BaseQuerySet(models.QuerySet):
def get_or_none(self, *args, **kwargs):
"""
Returns a single object matching the given keyword arguments, `None` otherwise.
"""
with suppress(self.model.DoesNotExist, ValidationError):
return self.get(*args, **kwargs)


class Vulnerability(models.Model):
"""
Expand Down Expand Up @@ -111,15 +130,21 @@ class VulnerabilityReference(models.Model):
through="VulnerabilityRelatedReference",
)

url = models.URLField(max_length=1024, help_text="URL to the vulnerability reference")
url = models.URLField(
max_length=1024,
help_text="URL to the vulnerability reference",
unique=True,
)

reference_id = models.CharField(
max_length=200,
help_text="An optional reference ID, such as DSA-4465-1 when available",
blank=True,
)

objects = BaseQuerySet.as_manager()

class Meta:
unique_together = ["url", "reference_id"]
ordering = ["reference_id", "url"]

def __str__(self):
Expand Down Expand Up @@ -147,6 +172,33 @@ class Meta:
ordering = ["vulnerability", "reference"]


class PackageQuerySet(BaseQuerySet, PackageURLQuerySet):
def get_or_create_from_purl(self, purl: PackageURL):
"""
Return an existing or new Package (created if neeed) given a
``purl`` PackageURL.
"""
purl_fields = without_empty_values(purl.to_dict(encode=True))
package, _ = Package.objects.get_or_create(**purl_fields)
return package

def for_package_url_object(self, purl):
"""
Filter the QuerySet with the provided Package URL object or string. The
``purl`` string is validated and transformed into filtering lookups. If
this is a PackageURL object it is reused as-is.
"""
if isinstance(purl, PackageURL):
lookups = without_empty_values(purl.to_dict(encode=True))
return self.filter(**lookups)

elif isinstance(purl, str):
return self.for_package_url(purl)

else:
return self.none()


class Package(PackageURLMixin):
"""
A software package with related vulnerabilities.
Expand All @@ -168,6 +220,8 @@ class Package(PackageURLMixin):
to="Vulnerability", through="PackageRelatedVulnerability"
)

objects = PackageQuerySet.as_manager()

@property
def purl(self):
return self.package_url
Expand Down
11 changes: 9 additions & 2 deletions vulnerabilities/tests/test_fix_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ def setUp(self):
for i in range(0, 10):
ref, _ = VulnerabilityReference.objects.get_or_create(
reference_id=f"cpe:/a:nginx:{i}",
url=f"https://nvd.nist.gov/vuln/search/results?adv_search=true&isCpeNameSearch=true&query=cpe:/a:nginx:{i}",
)
VulnerabilityRelatedReference.objects.create(
reference=ref, vulnerability=self.vulnerability
Expand Down Expand Up @@ -356,7 +357,10 @@ def setUp(self):
]
vuln = Vulnerability.objects.create(summary="test")
for cpe in self.exclusive_cpes:
ref = VulnerabilityReference.objects.create(reference_id=cpe)
ref = VulnerabilityReference.objects.create(
reference_id=cpe,
url=f"https://nvd.nist.gov/vuln/search/results?adv_search=true&isCpeNameSearch=true&query={cpe}",
)
VulnerabilityRelatedReference.objects.create(reference=ref, vulnerability=vuln)
second_vuln = Vulnerability.objects.create(summary="test-A")
self.non_exclusive_cpes = [
Expand All @@ -370,7 +374,10 @@ def setUp(self):
]
third_vuln = Vulnerability.objects.create(summary="test-B")
for cpe in self.non_exclusive_cpes:
ref = VulnerabilityReference.objects.create(reference_id=cpe)
ref = VulnerabilityReference.objects.create(
reference_id=cpe,
url=f"https://nvd.nist.gov/vuln/search/results?adv_search=true&isCpeNameSearch=true&query={cpe}",
)
VulnerabilityRelatedReference.objects.create(reference=ref, vulnerability=second_vuln)
VulnerabilityRelatedReference.objects.create(reference=ref, vulnerability=third_vuln)

Expand Down
Loading

0 comments on commit 6d379d0

Please sign in to comment.