Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests for vulnerability report and fix first_seen #2462

Merged
merged 6 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions rocky/reports/report_types/vulnerability_report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ def generate_data(self, input_ooi: str, valid_time: datetime) -> Dict[str, Dict[
data = {}
critical = RiskLevelSeverity.CRITICAL.value

first_seen = "-"
last_seen = "-"

findings = self.get_findings(input_ooi, valid_time)

for ip, findings_data in findings.items():
Expand All @@ -110,6 +107,8 @@ def generate_data(self, input_ooi: str, valid_time: datetime) -> Dict[str, Dict[

if time_history:
first_seen = time_history[0]
else:
first_seen = "-"

origins = self.octopoes_api_connector.list_origins(result=finding.ooi, valid_time=valid_time)
sources = ", ".join([origin.method for origin in origins])
Expand All @@ -119,7 +118,7 @@ def generate_data(self, input_ooi: str, valid_time: datetime) -> Dict[str, Dict[
filtered_findings[finding.primary_key] = {
str(_("Source")): sources,
str(_("First seen")): first_seen,
str(_("Last seen")): last_seen,
str(_("Last seen")): "-",
str(_("Evidence")): evidence,
}

Expand Down
124 changes: 120 additions & 4 deletions rocky/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import io
import json
import logging
from datetime import datetime, timezone
from ipaddress import IPv4Address
from os import urandom
from pathlib import Path
from typing import Dict
from typing import Dict, List, Optional, Union
from unittest.mock import MagicMock, patch
from uuid import UUID

import pytest
from django.conf import settings
Expand All @@ -26,9 +29,12 @@
OrganizationMember,
)

from octopoes.models import DeclaredScanProfile, Reference, ScanLevel
from octopoes.models.ooi.findings import Finding, KATFindingType, RiskLevelSeverity
from octopoes.models.ooi.network import Network
from octopoes.models import OOI, DeclaredScanProfile, Reference, ScanLevel
from octopoes.models.ooi.dns.zone import Hostname
from octopoes.models.ooi.findings import CVEFindingType, Finding, KATFindingType, RiskLevelSeverity
from octopoes.models.ooi.network import IPAddressV4, Network
from octopoes.models.origin import Origin, OriginType
from octopoes.models.transaction import TransactionRecord
from rocky.scheduler import Task

LANG_LIST = [code for code, _ in settings.LANGUAGES]
Expand All @@ -37,6 +43,11 @@
logging.getLogger("faker").setLevel(logging.INFO)


@pytest.fixture
def valid_time():
return datetime.now(timezone.utc)


@pytest.fixture(params=LANG_LIST)
def current_language(request):
return request.param
Expand Down Expand Up @@ -428,6 +439,64 @@ def network():
)


@pytest.fixture
def ipaddressv4(network):
return IPAddressV4(network=network.reference, address=IPv4Address("192.0.2.1"))


@pytest.fixture
def hostname(network):
return Hostname(name="example.com", network=network.reference)


@pytest.fixture
def cve_finding_type_2019_8331():
return CVEFindingType(
id="CVE-2019-8331",
description="In Bootstrap before 3.4.1 and 4.3.x before 4.3.1, XSS is possible in the tooltip or "
"popover data-template attribute.",
source="https://cve.circl.lu/cve/CVE-2019-8331",
risk_score=6.1,
risk_severity=RiskLevelSeverity.MEDIUM,
)


@pytest.fixture
def cve_finding_2019_8331():
return Finding(
finding_type=Reference.from_str("CVEFindingType|CVE-2019-8331"),
ooi=Reference.from_str(
"Finding|SoftwareInstance|HostnameHTTPURL|https|internet|mispo.es|443|/|Software|Bootstrap|3.3.7|cpe:/a:getbootstrap:bootstrap|CVE-2019-8331"
),
proof=None,
description="Vulnerability CVE-2019-8331 detected",
reproduce=None,
)


@pytest.fixture
def cve_finding_type_no_score():
return CVEFindingType(
id="CVE-0000-0001",
description="CVE Finding without scopre",
source="https://cve.circl.lu/cve/CVE-0000-0001",
risk_severity=RiskLevelSeverity.UNKNOWN,
)


@pytest.fixture
def cve_finding_no_score():
return Finding(
finding_type=Reference.from_str("CVEFindingType|CVE-0000-0001"),
ooi=Reference.from_str(
"Finding|SoftwareInstance|HostnameHTTPURL|https|internet|mispo.es|443|/|Software|Bootstrap|3.3.7|cpe:/a:getbootstrap:bootstrap|CVE-0000-0001"
),
proof=None,
description="Vulnerability CVE-0000-0001 detected",
reproduce=None,
)


@pytest.fixture
def finding():
return Finding(
Expand Down Expand Up @@ -609,3 +678,50 @@ def mock_scheduler_client_task_list(mocker):
mock_scheduler_client_session.get.return_value = response

return mock_scheduler_client_session


class MockOctopoesAPIConnector:
oois: Dict[Reference, OOI]
queries: Dict[str, Dict[Optional[Union[Reference, str]], List[OOI]]]
valid_time: datetime

def __init__(self, valid_time: datetime):
self.valid_time = valid_time

def get(self, reference: Reference, valid_time: Optional[datetime] = None) -> OOI:
return self.oois[reference]

def query(
self,
path: str,
valid_time: datetime,
source: Optional[Union[Reference, str]] = None,
offset: int = 0,
limit: int = 50,
) -> List[OOI]:
return self.queries[path][source]

def get_history(self, reference: Reference) -> List[TransactionRecord]:
return [
TransactionRecord(
txTime=self.valid_time,
txId=287,
validTime=self.valid_time,
contentHash="636a28da4792b9f5007143bb35bd37d48662df9b",
)
]

def list_origins(
self,
valid_time: Optional[datetime] = None,
source: Optional[Reference] = None,
result: Optional[Reference] = None,
task_id: Optional[UUID] = None,
origin_type: Optional[OriginType] = None,
) -> List[Origin]:
return []


@pytest.fixture
def mock_octopoes_api_connector(valid_time):
return MockOctopoesAPIConnector(valid_time)
Empty file.
93 changes: 93 additions & 0 deletions rocky/tests/reports/test_vulnerability_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from reports.report_types.vulnerability_report.report import VulnerabilityReport


def test_vulnerability_report_no_findings(mock_octopoes_api_connector, valid_time, ipaddressv4):
mock_octopoes_api_connector.oois = {
ipaddressv4.reference: ipaddressv4,
}
mock_octopoes_api_connector.queries = {
"IPAddress.<address[is IPPort].<ooi[is Finding]": {
ipaddressv4.reference: [],
},
"IPAddress.<address [is ResolvedHostname]"
".hostname.<netloc [is HostnameHTTPURL]>.<ooi [is SoftwareInstance].<ooi [is Finding]": {
ipaddressv4.reference: [],
},
}

report = VulnerabilityReport(mock_octopoes_api_connector)

data = report.generate_data(str(ipaddressv4.reference), valid_time)

assert data[str(ipaddressv4.reference)]["vulnerabilities"] == {}
assert data[str(ipaddressv4.reference)]["summary"]["total_findings"] == 0


def test_vulnerability_report_single_finding(
mock_octopoes_api_connector, valid_time, ipaddressv4, hostname, cve_finding_2019_8331, cve_finding_type_2019_8331
):
mock_octopoes_api_connector.oois = {
hostname.reference: hostname,
}
mock_octopoes_api_connector.queries = {
"Hostname.<hostname[is ResolvedHostname].address": {
hostname.reference: [ipaddressv4],
},
"IPAddress.<address[is IPPort].<ooi[is Finding]": {
ipaddressv4.reference: [],
},
"IPAddress.<address [is ResolvedHostname]"
".hostname.<netloc [is HostnameHTTPURL]>.<ooi [is SoftwareInstance].<ooi [is Finding]": {
ipaddressv4.reference: [cve_finding_2019_8331],
},
"Finding.finding_type": {
cve_finding_2019_8331.reference: [cve_finding_type_2019_8331],
},
}

report = VulnerabilityReport(mock_octopoes_api_connector)

data = report.generate_data(str(hostname.reference), valid_time)

assert data[str(ipaddressv4.reference)]["vulnerabilities"]["CVE-2019-8331"]["cvss"]["score"] == 6.1
assert data[str(ipaddressv4.reference)]["summary"]["total_criticals"] == 0
assert data[str(ipaddressv4.reference)]["summary"]["total_findings"] == 1


def test_vulnerability_report_finding_no_score(
mock_octopoes_api_connector,
valid_time,
ipaddressv4,
hostname,
cve_finding_2019_8331,
cve_finding_type_2019_8331,
cve_finding_no_score,
cve_finding_type_no_score,
):
mock_octopoes_api_connector.oois = {
hostname.reference: hostname,
}
mock_octopoes_api_connector.queries = {
"Hostname.<hostname[is ResolvedHostname].address": {
hostname.reference: [ipaddressv4],
},
"IPAddress.<address[is IPPort].<ooi[is Finding]": {
ipaddressv4.reference: [],
},
"IPAddress.<address [is ResolvedHostname]"
".hostname.<netloc [is HostnameHTTPURL]>.<ooi [is SoftwareInstance].<ooi [is Finding]": {
ipaddressv4.reference: [cve_finding_2019_8331, cve_finding_no_score],
},
"Finding.finding_type": {
cve_finding_2019_8331.reference: [cve_finding_type_2019_8331],
cve_finding_no_score.reference: [cve_finding_type_no_score],
},
}

report = VulnerabilityReport(mock_octopoes_api_connector)

data = report.generate_data(str(hostname.reference), valid_time)

assert data[str(ipaddressv4.reference)]["vulnerabilities"]["CVE-2019-8331"]["cvss"]["score"] == 6.1
assert data[str(ipaddressv4.reference)]["summary"]["total_criticals"] == 0
assert data[str(ipaddressv4.reference)]["summary"]["total_findings"] == 2