Skip to content

Commit

Permalink
Fix code style and ignore Bandit false positive
Browse files Browse the repository at this point in the history
  • Loading branch information
pablosnt committed Apr 6, 2024
1 parent 3d80718 commit 05a75c0
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 17 deletions.
7 changes: 6 additions & 1 deletion src/backend/findings/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ def consume(execution: Execution, findings: List[Finding]) -> None:
settings = Settings.objects.first()
if findings:
notifications = [SMTP(), Telegram()]
for platform in [NvdNist(), HackTricks(), CVECrowd(), DefectDojo()] + notifications:
for platform in [
NvdNist(),
HackTricks(),
CVECrowd(),
DefectDojo(),
] + notifications:
platform.process_findings(execution, findings)
for finding in findings:
if settings.auto_fix_findings and finding.is_fixed:
Expand Down
12 changes: 9 additions & 3 deletions src/backend/platforms/cvecrowd/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@ def is_available(self) -> bool:
return False

def _get_trending_cves(self) -> List[str]:
if self.integration.enabled and self.settings.secret and len(self.trending_cves) == 0:
if (
self.integration.enabled
and self.settings.secret
and len(self.trending_cves) == 0
):
try:
self.trending_cves = self._request(
self.url,
headers={"Authorization": f"Bearer {self.settings.secret}"},
params={"days": self.settings.trending_span_days},
)
except Exception:
except Exception: # nosec
pass
return self.trending_cves

Expand Down Expand Up @@ -64,7 +68,9 @@ def monitor(cls) -> None:
logger.warn("[CVE Crowd - Monitor] No trending CVEs found")
return
already_trending_queryset = Vulnerability.objects.filter(trending=True).all()
already_trending_cves = list(already_trending_queryset.values_list("cve", flat=True))
already_trending_cves = list(
already_trending_queryset.values_list("cve", flat=True)
)
already_trending_queryset.exclude(cve__in=trending_cves).update(trending=False)
Vulnerability.objects.filter(trending=False, cve__in=trending_cves).update(
trending=True
Expand Down
2 changes: 1 addition & 1 deletion src/backend/platforms/mail/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def _notify_alert(self, users: List[Any], alert: Alert, finding: Finding) -> Non
subjects = {
AlertMode.NEW: f"New {finding.__class__.__name__.lower()} detected",
AlertMode.FILTER.value: f"New {finding.__class__.__name__.lower()} matches alert criteria",
AlertMode.MONITOR.value: f"New trending CVE",
AlertMode.MONITOR.value: "New trending CVE",
}
self._notify(
users,
Expand Down
47 changes: 35 additions & 12 deletions src/backend/tests/platforms/test_cvecrowd.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,67 @@ def setUp(self) -> None:
super().setUp()
self._setup_tasks_and_executions()
self.not_trending = Vulnerability.objects.create(
name="not trending", description="not trending", cve="CVE-2023-9999", severity=Severity.LOW
name="not trending",
description="not trending",
cve="CVE-2023-9999",
severity=Severity.LOW,
)
self.trending = Vulnerability.objects.create(
name="trending", description="trending", cve="CVE-2022-1113", severity=Severity.HIGH
name="trending",
description="trending",
cve="CVE-2022-1113",
severity=Severity.HIGH,
)
self.not_trending.executions.add(self.execution3)
self.trending.executions.add(self.execution3)
Alert.objects.create(project=self.execution3.task.target.project, item=AlertItem.CVE, mode=AlertMode.MONITOR, enabled=True)
Alert.objects.create(
project=self.execution3.task.target.project,
item=AlertItem.CVE,
mode=AlertMode.MONITOR,
enabled=True,
)
self.settings = CVECrowdSettings.objects.first()
self.settings.secret = "fake-token"
self.settings.save(update_fields=["_api_token"])
self.cvecrowd = CVECrowd()

@mock.patch("platforms.cvecrowd.integrations.CVECrowd._request", success)
def test_process_findings(self) -> None:
self.cvecrowd.process_findings(self.execution3, [self.trending, self.not_trending])
self.cvecrowd.process_findings(
self.execution3, [self.trending, self.not_trending]
)
self.assertTrue(self.trending.trending)
self.assertFalse(self.not_trending.trending)

@mock.patch("platforms.cvecrowd.integrations.CVECrowd._request", lambda *args, **kwargs : [])
def test_process_findings(self) -> None:
self.cvecrowd.process_findings(self.execution3, [self.trending, self.not_trending])
@mock.patch(
"platforms.cvecrowd.integrations.CVECrowd._request", lambda *args, **kwargs: []
)
def test_process_findings_not_found(self) -> None:
self.cvecrowd.process_findings(
self.execution3, [self.trending, self.not_trending]
)
self.assertFalse(self.trending.trending)
self.assertFalse(self.not_trending.trending)

@mock.patch("platforms.cvecrowd.integrations.CVECrowd._request", success)
def test_monitor(self) -> None:
CVECrowd.monitor()
for vulnerability, trending in [(self.trending, True), (self.not_trending, False)]:
for vulnerability, trending in [
(self.trending, True),
(self.not_trending, False),
]:
vulnerability = Vulnerability.objects.get(pk=vulnerability.id)
self.assertEquals(trending, vulnerability.trending)


new_settings = {
"api_token": "cve-crowd-token",
"trending_span_days": 3,
"execute_per_execution": False
"execute_per_execution": False,
}
invalid_settings = {**new_settings, "trending_span_days": 10}


class CVECrowdSettingsTest(ApiTest):
endpoint = "/api/cvecrowd/1/"
expected_str = "CVE Crowd"
Expand All @@ -73,10 +94,12 @@ class CVECrowdSettingsTest(ApiTest):
"id": 1,
"api_token": None,
"trending_span_days": 7,
"execute_per_execution": True
"execute_per_execution": True,
},
),
ApiTestCase(["auditor1", "auditor2", "reader1", "reader2"], "put", 403, new_settings),
ApiTestCase(
["auditor1", "auditor2", "reader1", "reader2"], "put", 403, new_settings
),
ApiTestCase(["admin1", "admin2"], "put", 400, invalid_settings),
ApiTestCase(
["admin1", "admin2"],
Expand Down Expand Up @@ -104,4 +127,4 @@ class CVECrowdSettingsTest(ApiTest):
]

def _get_object(self) -> Any:
return CVECrowdSettings.objects.first()
return CVECrowdSettings.objects.first()

0 comments on commit 05a75c0

Please sign in to comment.