Skip to content

Commit

Permalink
Allow for filtering Dependency-Track projects.
Browse files Browse the repository at this point in the history
When using Dependency-Track as source for dependencies, security warnings, or source-up-to-dateness, allow for filtering by project name and version.

Closes #8686.
  • Loading branch information
fniessink committed May 22, 2024
1 parent fbe927d commit 0b2b825
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 92 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
"""Dependency-Track base collector."""

from collections.abc import AsyncIterator
from typing import TypedDict, cast

from base_collectors import SourceCollector
from collector_utilities.type import URL
from collector_utilities.functions import match_string_or_regular_expression
from collector_utilities.type import URL, Response
from model import SourceResponses


class DependencyTrackProject(TypedDict):
"""Project as returned by Dependency-Track."""

# Last BOM import is a Unix timestamp, despite the Dependency-Tracker Swagger docs saying it's a datetime string
# See https://github.com/DependencyTrack/dependency-track/issues/840
lastBomImport: int
name: str
uuid: str
version: str


class DependencyTrackBase(SourceCollector):
"""Dependency-Track base class."""

Expand Down Expand Up @@ -40,9 +55,28 @@ async def _get_source_responses(self, *urls: URL) -> SourceResponses:

async def _get_project_uuids(self) -> dict[str, str]:
"""Return a mapping of project UUIDs to project names."""
return {project["uuid"]: project["name"] async for project in self._get_projects()}

async def _get_projects(self) -> AsyncIterator[DependencyTrackProject]:
"""Return the Dependency-Track projects."""
projects_api = URL(await self._api_url() + "/project")
project_uuids = {}
for response in await DependencyTrackBase._get_source_responses(self, projects_api):
projects = await response.json(content_type=None)
project_uuids.update({project["uuid"]: project["name"] for project in projects})
return project_uuids
# We need an async for-loop and yield projects one by one because Python has no `async yield from`,
# see https://peps.python.org/pep-0525/#asynchronous-yield-from
async for project in self._get_projects_from_response(response):
yield project

async def _get_projects_from_response(self, response: Response) -> AsyncIterator[DependencyTrackProject]:
"""Return the projects from the response that match the configured project names and versions."""
project_names = cast(list, self._parameter("project_names"))
project_versions = cast(list, self._parameter("project_versions"))
for project in await response.json(content_type=None):
if self._project_matches(project, project_names, project_versions):
yield project

@staticmethod
def _project_matches(project: DependencyTrackProject, names: list[str], versions: list[str]) -> bool:
"""Return whether the project name matches the project names and versions."""
project_matches_name = match_string_or_regular_expression(project["name"], names) if names else True
project_matches_version = match_string_or_regular_expression(project["version"], versions) if versions else True
return project_matches_name and project_matches_version
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,7 @@
from collector_utilities.type import URL
from model import Entities, Entity, SourceResponses

from .base import DependencyTrackBase


class DependencyTrackProject(TypedDict):
"""Project as returned by Dependency-Track."""

name: str
uuid: str
from .base import DependencyTrackBase, DependencyTrackProject


class DependencyTrackRepositoryMetaData(TypedDict):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
"""Dependency-Track source up-to-dateness collector."""

from datetime import datetime
from typing import TypedDict

from base_collectors import TimePassedCollector
from collector_utilities.date_time import datetime_from_timestamp
from collector_utilities.exceptions import CollectorError
from collector_utilities.type import URL, Response
from model import Entities, Entity, SourceResponses

from .base import DependencyTrackBase


class DependencyTrackProject(TypedDict):
"""Project as returned by Dependency-Track."""

lastBomImport: int # Timestamp
name: str
uuid: str
from .base import DependencyTrackBase, DependencyTrackProject


class DependencyTrackSourceUpToDateness(DependencyTrackBase, TimePassedCollector):
Expand All @@ -28,16 +20,19 @@ async def _api_url(self) -> URL:

async def _parse_source_response_date_time(self, response: Response) -> datetime:
"""Override to parse the timestamp from the response."""
projects = await response.json(content_type=None)
datetimes = [self._last_bom_import_datetime(project) for project in projects]
return self.minimum(datetimes)
if datetimes := [
self._last_bom_import_datetime(project) async for project in self._get_projects_from_response(response)
]:
return self.minimum(datetimes)
error_message = "No projects found"
raise CollectorError(error_message)

async def _parse_entities(self, responses: SourceResponses) -> Entities:
"""Parse the entities from the responses."""
landing_url = str(self._parameter("landing_url")).strip("/")
entities = Entities()
for response in responses:
for project in await response.json(content_type=None):
async for project in self._get_projects_from_response(response):
uuid = project["uuid"]
entity = Entity(
key=uuid,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Unit tests for the Dependency-Track security warnings collector."""

from source_collectors.dependency_track.base import DependencyTrackProject
from source_collectors.dependency_track.dependencies import DependencyTrackComponent

from tests.source_collectors.source_collector_test_case import SourceCollectorTestCase
Expand All @@ -11,24 +12,23 @@ class DependencyTrackDependenciesTest(SourceCollectorTestCase):
METRIC_TYPE = "dependencies"
SOURCE_TYPE = "dependency_track"

def setUp(self) -> None:
"""Extend to set up Dependency-Track projects."""
super().setUp()
self.projects = [{"uuid": "project uuid", "name": "project name"}]
def projects(self) -> list[DependencyTrackProject]:
"""Create the Dependency-Track projects fixture."""
return [DependencyTrackProject(name="project name", uuid="project uuid", version="1.4", lastBomImport=0)]

def create_dependencies(self, latest_version: str) -> list[DependencyTrackComponent]:
def dependencies(self, latest_version: str) -> list[DependencyTrackComponent]:
"""Create a list of dependencies as returned by Dependency-Track."""
dependency: DependencyTrackComponent = {
"name": "component name",
"project": {"name": "project name", "uuid": "project uuid"},
"project": self.projects()[0],
"uuid": "component-uuid",
"version": "1.0",
}
if latest_version:
dependency["repositoryMeta"] = {"latestVersion": latest_version}
return [dependency]

def create_entities(self, latest_version: str, latest_version_status: str) -> list[dict[str, str]]:
def entities(self, latest_version: str, latest_version_status: str) -> list[dict[str, str]]:
"""Create a list of expected entities."""
return [
{
Expand All @@ -50,26 +50,51 @@ async def test_no_projects(self):

async def test_no_vulnerabilities(self):
"""Test one project without dependencies."""
response = await self.collect(get_request_json_side_effect=[self.projects, []])
response = await self.collect(get_request_json_side_effect=[self.projects(), []])
self.assert_measurement(response, value="0", entities=[])

async def test_updateable_dependencies(self):
"""Test one project with a component that can be updated."""
response = await self.collect(get_request_json_side_effect=[self.projects, self.create_dependencies("1.1")])
self.assert_measurement(response, value="1", entities=self.create_entities("1.1", "update possible"))
response = await self.collect(get_request_json_side_effect=[self.projects(), self.dependencies("1.1")])
self.assert_measurement(response, value="1", entities=self.entities("1.1", "update possible"))

async def test_up_to_date_dependencies(self):
"""Test one project with a component that is up to date."""
response = await self.collect(get_request_json_side_effect=[self.projects, self.create_dependencies("1.0")])
self.assert_measurement(response, value="1", entities=self.create_entities("1.0", "up-to-date"))
response = await self.collect(get_request_json_side_effect=[self.projects(), self.dependencies("1.0")])
self.assert_measurement(response, value="1", entities=self.entities("1.0", "up-to-date"))

async def test_unknown_latest_version(self):
"""Test one project with a component whose latest version is unknown."""
response = await self.collect(get_request_json_side_effect=[self.projects, self.create_dependencies("")])
self.assert_measurement(response, value="1", entities=self.create_entities("unknown", "unknown"))
response = await self.collect(get_request_json_side_effect=[self.projects(), self.dependencies("")])
self.assert_measurement(response, value="1", entities=self.entities("unknown", "unknown"))

async def test_filter_by_latest_version_status(self):
"""Test that components can be filtered by latest version status."""
self.set_source_parameter("latest_version_status", ["update possible"])
response = await self.collect(get_request_json_side_effect=[self.projects, self.create_dependencies("1.0")])
response = await self.collect(get_request_json_side_effect=[self.projects(), self.dependencies("1.0")])
self.assert_measurement(response, value="0", entities=[])

async def test_filter_by_project_name(self):
"""Test filtering projects by name."""
self.set_source_parameter("project_names", ["other project"])
response = await self.collect(get_request_json_return_value=self.projects())
self.assert_measurement(response, value="0", entities=[])

async def test_filter_by_project_regular_expression(self):
"""Test filtering projects by regular expression."""
self.set_source_parameter("project_names", ["project .*"])
response = await self.collect(get_request_json_side_effect=[self.projects(), self.dependencies("1.0")])
self.assert_measurement(response, value="1", entities=self.entities("1.0", "up-to-date"))

async def test_filter_by_project_version(self):
"""Test filtering projects by version."""
self.set_source_parameter("project_versions", ["1.2", "1.3"])
response = await self.collect(get_request_json_return_value=self.projects())
self.assert_measurement(response, value="0", entities=[])

async def test_filter_by_project_name_and_version(self):
"""Test filtering projects by name and version."""
self.set_source_parameter("project_names", ["project .*"])
self.set_source_parameter("project_versions", ["1.3", "1.4"])
response = await self.collect(get_request_json_side_effect=[self.projects(), self.dependencies("1.0")])
self.assert_measurement(response, value="1", entities=self.entities("1.0", "up-to-date"))
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

from aiohttp import BasicAuth

from source_collectors.dependency_track.base import DependencyTrackProject
from source_collectors.dependency_track.security_warnings import (
DependencyTrackComponent,
DependencyTrackFinding,
DependencyTrackVulnerability,
)

from tests.source_collectors.source_collector_test_case import SourceCollectorTestCase


Expand All @@ -11,22 +18,29 @@ class DependencyTrackSecurityWarningsTest(SourceCollectorTestCase):
METRIC_TYPE = "security_warnings"
SOURCE_TYPE = "dependency_track"

def setUp(self) -> None:
"""Extend to set up Dependency-Track responses and expected measurement entities."""
super().setUp()
self.projects = [{"uuid": "project uuid", "name": "project name"}]
self.vulnerabilities = [
{
"component": {"name": "component name", "project": "project uuid", "uuid": "component-uuid"},
"matrix": "matrix",
"vulnerability": {
"description": "vulnerability description",
"severity": "UNASSIGNED",
"vulnId": "CVE-123",
},
},
def projects(self) -> list[DependencyTrackProject]:
"""Create the Dependency-Track projects fixture."""
return [DependencyTrackProject(name="project name", uuid="project uuid", version="1.4", lastBomImport=0)]

def findings(self) -> list[DependencyTrackFinding]:
"""Create the Dependency-Track findings fixture."""
return [
DependencyTrackFinding(
component=DependencyTrackComponent(
name="component name", project="project uuid", uuid="component-uuid"
),
matrix="matrix",
vulnerability=DependencyTrackVulnerability(
description="vulnerability description",
severity="UNASSIGNED",
vulnId="CVE-123",
),
),
]
self.entities = [

def entities(self) -> list[dict[str, str]]:
"""Create the expected entities."""
return [
{
"component": "component name",
"component_landing_url": "/components/component-uuid",
Expand All @@ -46,20 +60,45 @@ async def test_no_projects(self):

async def test_one_project_without_vulnerabilities(self):
"""Test one project without vulnerabilities."""
response = await self.collect(get_request_json_side_effect=[self.projects, []])
response = await self.collect(get_request_json_side_effect=[self.projects(), []])
self.assert_measurement(response, value="0", entities=[])

async def test_one_project_with_vulnerabilities(self):
"""Test one project with vulnerabilities."""
response = await self.collect(get_request_json_side_effect=[self.projects, self.vulnerabilities])
self.assert_measurement(response, value="1", entities=self.entities)
response = await self.collect(get_request_json_side_effect=[self.projects(), self.findings()])
self.assert_measurement(response, value="1", entities=self.entities())

async def test_filter_vulnerabilities(self):
async def test_filter_by_severity(self):
"""Test that vulnerabilities can be filtered."""
self.set_source_parameter("severities", ["High", "Critical"])
response = await self.collect(get_request_json_side_effect=[self.projects, self.vulnerabilities])
response = await self.collect(get_request_json_side_effect=[self.projects(), self.findings()])
self.assert_measurement(response, value="0", entities=[])

async def test_filter_by_project_name(self):
"""Test filtering projects by name."""
self.set_source_parameter("project_names", ["other project"])
response = await self.collect(get_request_json_side_effect=[self.projects(), self.findings()])
self.assert_measurement(response, value="0", entities=[])

async def test_filter_by_project_regular_expression(self):
"""Test filtering projects by regular expression."""
self.set_source_parameter("project_names", ["project .*"])
response = await self.collect(get_request_json_side_effect=[self.projects(), self.findings()])
self.assert_measurement(response, value="1", entities=self.entities())

async def test_filter_by_project_version(self):
"""Test filtering projects by version."""
self.set_source_parameter("project_versions", ["1.2", "1.3"])
response = await self.collect(get_request_json_side_effect=[self.projects(), self.findings()])
self.assert_measurement(response, value="0", entities=[])

async def test_filter_by_project_name_and_version(self):
"""Test filtering projects by name and version."""
self.set_source_parameter("project_names", ["project .*"])
self.set_source_parameter("project_versions", ["1.3", "1.4"])
response = await self.collect(get_request_json_side_effect=[self.projects(), self.findings()])
self.assert_measurement(response, value="1", entities=self.entities())

async def test_api_key(self):
"""Test that the API key is passed as header."""
self.set_source_parameter("private_token", "API key")
Expand Down
Loading

0 comments on commit 0b2b825

Please sign in to comment.