Skip to content

Commit

Permalink
Refactor database migration unit tests.
Browse files Browse the repository at this point in the history
Prepares for #3130.
  • Loading branch information
fniessink committed May 21, 2024
1 parent 7b6d4a2 commit e0620d5
Showing 1 changed file with 90 additions and 162 deletions.
252 changes: 90 additions & 162 deletions components/api_server/tests/initialization/test_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,212 +3,140 @@
from initialization.migrations import perform_migrations

from tests.base import DataModelTestCase
from tests.fixtures import REPORT_ID, SUBJECT_ID, METRIC_ID, METRIC_ID2, METRIC_ID3, SOURCE_ID, SOURCE_ID2
from tests.fixtures import SourceId, REPORT_ID, SUBJECT_ID, METRIC_ID, METRIC_ID2, METRIC_ID3, SOURCE_ID, SOURCE_ID2


class DatabaseMigrationsChangeAccessibilityViolationsTest(DataModelTestCase):
class MigrationTestCase(DataModelTestCase):
"""Base class for migration unit tests."""

def existing_report(self, metric_type: str):
"""Return a report fixture. To be extended in subclasses."""
return {
"_id": "id",
"report_uuid": REPORT_ID,
"subjects": {SUBJECT_ID: {"metrics": {METRIC_ID: {"type": metric_type}}}},
}

def inserted_report(self, **kwargs):
"""Return a report as it is expected to have been inserted into the reports collection.
By default, this is the name as the existing report, except for the _id being removed.
"""
report = self.existing_report(**kwargs)
del report["_id"]
return report


class ChangeAccessibilityViolationsTest(MigrationTestCase):
"""Unit tests for the accessibility violations database migration."""

def test_change_accessibility_violations_to_violations_without_reports(self):
def existing_report(
self,
*,
metric_type: str = "accessibility",
metric_name: str = "",
metric_unit: str = "",
extra_metrics: bool = False,
):
"""Extend to add name and unit to the metric and optional extra metrics."""
report = super().existing_report(metric_type=metric_type)
report["subjects"][SUBJECT_ID]["metrics"][METRIC_ID]["name"] = metric_name
report["subjects"][SUBJECT_ID]["metrics"][METRIC_ID]["unit"] = metric_unit
if extra_metrics:
report["subjects"][SUBJECT_ID]["metrics"][METRIC_ID2] = {"type": "violations"}
report["subjects"][SUBJECT_ID]["metrics"][METRIC_ID3] = {"type": "security_warnings"}
return report

def inserted_report(
self, metric_name: str = "Accessibility violations", metric_unit: str = "accessibility violations", **kwargs
):
"""Return a report as it is expected to have been inserted into the reports collection."""
return super().inserted_report(
metric_type="violations", metric_name=metric_name, metric_unit=metric_unit, **kwargs
)

def test_no_reports(self):
"""Test that the migration succeeds without reports."""
self.database.reports.find.return_value = []
perform_migrations(self.database)
self.database.reports.replace_one.assert_not_called()

def test_change_accessibility_violations_to_violations_when_report_has_no_accessibility_metrics(self):
"""Test that the migration succeeds wtih reports, but without accessibility metrics."""
self.database.reports.find.return_value = [
{"report_uuid": REPORT_ID, "subjects": {SUBJECT_ID: {"metrics": {METRIC_ID: {"type": "loc"}}}}}
]
def test_report_without_accessibility_metrics(self):
"""Test that the migration succeeds with reports, but without accessibility metrics."""
self.database.reports.find.return_value = [self.existing_report(metric_type="loc")]
perform_migrations(self.database)
self.database.reports.replace_one.assert_not_called()

def test_change_accessibility_violations_to_violations_when_report_has_an_accessibility_metric(self):
def test_report_with_accessibility_metric(self):
"""Test that the migration succeeds with an accessibility metric."""
self.database.reports.find.return_value = [
{
"_id": "id",
"report_uuid": REPORT_ID,
"subjects": {SUBJECT_ID: {"metrics": {METRIC_ID: {"type": "accessibility"}}}},
},
]
self.database.reports.find.return_value = [self.existing_report()]
perform_migrations(self.database)
self.database.reports.replace_one.assert_called_once_with(
{"_id": "id"},
{
"report_uuid": REPORT_ID,
"subjects": {
SUBJECT_ID: {
"metrics": {
METRIC_ID: {
"type": "violations",
"name": "Accessibility violations",
"unit": "accessibility violations",
}
}
}
},
},
)
self.database.reports.replace_one.assert_called_once_with({"_id": "id"}, self.inserted_report())

def test_change_accessibility_violations_to_violations_when_metric_has_name_and_unit(self):
def test_accessibility_metric_with_name_and_unit(self):
"""Test that the migration succeeds with an accessibility metric, and existing name and unit are kept."""
self.database.reports.find.return_value = [
{
"_id": "id",
"report_uuid": REPORT_ID,
"subjects": {
SUBJECT_ID: {"metrics": {METRIC_ID: {"type": "accessibility", "name": "name", "unit": "unit"}}},
},
},
]
self.database.reports.find.return_value = [self.existing_report(metric_name="name", metric_unit="unit")]
perform_migrations(self.database)
self.database.reports.replace_one.assert_called_once_with(
{"_id": "id"},
{
"report_uuid": REPORT_ID,
"subjects": {
SUBJECT_ID: {
"metrics": {
METRIC_ID: {
"type": "violations",
"name": "name",
"unit": "unit",
}
}
}
},
},
self.inserted_report(metric_name="name", metric_unit="unit"),
)

def test_change_accessibility_violations_to_violations_when_report_has_accessibility_metric_and_other_types(self):
def test_report_with_accessibility_metric_and_other_types(self):
"""Test that the migration succeeds with an accessibility metric and other metric types."""
self.database.reports.find.return_value = [
{
"_id": "id",
"report_uuid": REPORT_ID,
"subjects": {
SUBJECT_ID: {
"metrics": {
METRIC_ID: {"type": "accessibility"},
METRIC_ID2: {"type": "violations"},
METRIC_ID3: {"type": "security_warnings"},
}
}
},
}
]
self.database.reports.find.return_value = [self.existing_report(extra_metrics=True)]
perform_migrations(self.database)
self.database.reports.replace_one.assert_called_once_with(
{"_id": "id"},
{
"report_uuid": REPORT_ID,
"subjects": {
SUBJECT_ID: {
"metrics": {
METRIC_ID: {
"type": "violations",
"name": "Accessibility violations",
"unit": "accessibility violations",
},
METRIC_ID2: {
"type": "violations",
},
METRIC_ID3: {
"type": "security_warnings",
},
}
}
},
},
self.inserted_report(extra_metrics=True),
)


class DatabaseMigrationsBranchParameterTest(DataModelTestCase):
class BranchParameterTest(MigrationTestCase):
"""Unit tests for the branch parameter database migration."""

def test_migration_without_reports(self):
def existing_report(
self, metric_type: str = "loc", sources: dict[SourceId, dict[str, str | dict[str, str]]] | None = None
):
"""Extend to add sources and an extra metric without sources."""
report = super().existing_report(metric_type=metric_type)
report["subjects"][SUBJECT_ID]["metrics"][METRIC_ID2] = {"type": "issues"}
if sources:
report["subjects"][SUBJECT_ID]["metrics"][METRIC_ID]["sources"] = sources
return report

def test_no_reports(self):
"""Test that the migration succeeds without reports."""
self.database.reports.find.return_value = []
perform_migrations(self.database)
self.database.reports.replace_one.assert_not_called()

def test_migration_when_report_has_no_metrics_with_sources_with_branch_parameter(self):
def test_report_without_branch_parameter(self):
"""Test that the migration succeeds with reports, but without metrics with a branch parameter."""
self.database.reports.find.return_value = [
{
"report_uuid": REPORT_ID,
"subjects": {
SUBJECT_ID: {
"metrics": {
METRIC_ID: {"type": "issues"},
METRIC_ID2: {"type": "loc"},
},
},
},
},
]
self.database.reports.find.return_value = [self.existing_report()]
perform_migrations(self.database)
self.database.reports.replace_one.assert_not_called()

def test_migration_when_report_has_source_with_branch(self):
def test_report_with_non_empty_branch_parameter(self):
"""Test that the migration succeeds when the branch parameter is not empty."""
self.database.reports.find.return_value = [
{
"report_uuid": REPORT_ID,
"subjects": {
SUBJECT_ID: {
"metrics": {
METRIC_ID: {
"type": "loc",
"sources": {SOURCE_ID: {"type": "sonarqube", "parameters": {"branch": "main"}}},
},
},
},
},
},
self.existing_report(sources={SOURCE_ID: {"type": "sonarqube", "parameters": {"branch": "main"}}})
]
perform_migrations(self.database)
self.database.reports.replace_one.assert_not_called()

def test_migration_when_report_has_sonarqube_metric_with_branch_without_value(self):
def test_report_with_branch_parameter_without_value(self):
"""Test that the migration succeeds when a source has an empty branch parameter."""
self.database.reports.find.return_value = [
{
"_id": "id",
"report_uuid": REPORT_ID,
"subjects": {
SUBJECT_ID: {
"metrics": {
METRIC_ID: {
"type": "source_up_to_dateness",
"sources": {
SOURCE_ID: {"type": "gitlab", "parameters": {"branch": ""}},
SOURCE_ID2: {"type": "cloc", "parameters": {"branch": ""}},
},
},
},
},
},
},
]
sources = {
SOURCE_ID: {"type": "gitlab", "parameters": {"branch": ""}},
SOURCE_ID2: {"type": "cloc", "parameters": {"branch": ""}},
}
report = self.existing_report(metric_type="source_up_to_dateness", sources=sources)
self.database.reports.find.return_value = [report]
perform_migrations(self.database)
self.database.reports.replace_one.assert_called_once_with(
{"_id": "id"},
{
"report_uuid": REPORT_ID,
"subjects": {
SUBJECT_ID: {
"metrics": {
METRIC_ID: {
"type": "source_up_to_dateness",
"sources": {
SOURCE_ID: {"type": "gitlab", "parameters": {"branch": "master"}},
SOURCE_ID2: {"type": "cloc", "parameters": {"branch": ""}},
},
},
},
},
},
},
)
inserted_sources = {
SOURCE_ID: {"type": "gitlab", "parameters": {"branch": "master"}},
SOURCE_ID2: {"type": "cloc", "parameters": {"branch": ""}},
}
inserted_report = self.inserted_report(metric_type="source_up_to_dateness", sources=inserted_sources)
self.database.reports.replace_one.assert_called_once_with({"_id": "id"}, inserted_report)

0 comments on commit e0620d5

Please sign in to comment.