Skip to content

Commit

Permalink
feat(evaluator): skip re-evaluation for systems without packages chan…
Browse files Browse the repository at this point in the history
…ged in repos

RHINENG-10107
  • Loading branch information
jdobes authored and psegedy committed Aug 13, 2024
1 parent 2e27c78 commit 2650903
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 10 deletions.
10 changes: 10 additions & 0 deletions evaluator/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Vulnerability engine Evaluator component.
"""
import json
import re
from collections import namedtuple
from dataclasses import dataclass
from typing import Dict
Expand All @@ -16,6 +17,7 @@

CFG = Config()
PROMETHEUS_PORT = CFG.prometheus_port or str(CFG.evaluator_prometheus_port)
PACKAGE_RE = re.compile(r"(([0-9]+):)?(?P<pn>[^:]+)-.*[^-:]+-.*")

# Prometheus timings
VMAAS_EVAL_TIME = Histogram(
Expand All @@ -35,12 +37,20 @@

# Prometheus counts
EVAL_COUNT = Counter("ve_evaluator_evaluations", "Number of evaluations attempted")
EVAL_PERFORMED_COUNT = Counter("ve_evaluator_evaluations_performed", "Number of evaluations performed")
INV_ID_NOT_FOUND = Counter("ve_evaluator_inventory_not_found", "Number of times inventory-id not in SystemPlatform")
UNKNOWN_MSG = Counter("ve_evaluator_unknown_msg", "Number of unrecognized messages delivered from queue")
MESSAGE_PARSE_ERROR = Counter("ve_evaluator_message_parse_error", "# of message parse errors")
VMAAS_ERRORS_SKIP = Counter("ve_evaluator_vmaas_errors_skip", "# of evaluations skipped due to VMaaS errors")
EMPTY_DATA_SKIP = Counter("ve_evaluator_data_skip", "# of evaluations skipped due to empty VMaaS json and rule results")
RECALC_EVENT_ERROR = Counter("ve_evaluator_recalc_event_error", "# of message recalc event errors")
EVALUATED_EARLIER_SKIP = Counter("ve_evaluator_evaluated_earlier_skip", "# of messages skipped because the were evaluated earlier")
RECALC_PACKAGES_UNCHANGED_SKIP = Counter(
"ve_evaluator_recalc_packages_unchanged_skip", "# of messages skipped because no relevant packages changed in repo"
)

# recalc event item
RecalcEvent = namedtuple("RecalcEvent", ["created", "changed_packages"])
# single member inside rule cache
RuleCache = namedtuple("RuleCache", ["id", "playbook_count"])
# single member inside cve cache
Expand Down
3 changes: 2 additions & 1 deletion evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ async def _consume_message(self, msg: ConsumerRecord):
org_id = msg_dict["host"]["org_id"]
request_id = msg_dict.get("platform_metadata", {}).get("request_id")
timestamp = parser.parse(ts) if (ts := msg_dict.get("timestamp")) else None
recalc_event_id = msg_dict.get("recalc_event_id")

try:
msg_type = EvaluatorMessageType(msg_dict.get("type"))
except ValueError:
LOGGER.error("received unknown message type: %s", msg_type)
return

await self.processor.evaluate_system(inventory_id, org_id, request_id, timestamp)
await self.processor.evaluate_system(inventory_id, org_id, request_id, timestamp, recalc_event_id=recalc_event_id)

async def consume_message(self, msg: ConsumerRecord):
"""Consume message for evaluation, wrapper for semaphore"""
Expand Down
4 changes: 1 addition & 3 deletions evaluator/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,12 @@ async def _insert_rule(self, rule: str, rule_cves: [str], conn: AsyncConnection,
return row["id"]

@time(EVAL_PART_TIME.labels(part="vmaas_request"))
async def _perform_vmaas_request(self, vmaas_req: str) -> (List[CveAdvisories], List[CveAdvisories], List[CveUnpatched]):
async def _perform_vmaas_request(self, vmaas_json: dict) -> (List[CveAdvisories], List[CveAdvisories], List[CveUnpatched]):
"""Perform VMAAS request for package based evaluation"""
playbook_cves = []
manually_fixable_cves = []
unpatched_cves = []

vmaas_json = json.loads(vmaas_req)

if not vmaas_json.get("package_list", []) or not vmaas_json.get("repository_list", []):
return playbook_cves, manually_fixable_cves, unpatched_cves

Expand Down
82 changes: 76 additions & 6 deletions evaluator/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
Processing logic of grouped evaluator message for single system
"""
import asyncio
import json
from datetime import datetime
from datetime import timedelta
from typing import Dict
from typing import List
from typing import Optional
Expand All @@ -17,9 +19,15 @@
from .common import EMPTY_DATA_SKIP
from .common import EVAL_COUNT
from .common import EVAL_PART_TIME
from .common import EVAL_PERFORMED_COUNT
from .common import EVAL_TIME
from .common import EVALUATED_EARLIER_SKIP
from .common import EvaluatorException
from .common import INV_ID_NOT_FOUND
from .common import PACKAGE_RE
from .common import RECALC_EVENT_ERROR
from .common import RECALC_PACKAGES_UNCHANGED_SKIP
from .common import RecalcEvent
from .common import SystemPlatform
from .common import SystemVulnerabilitiesRow
from .common import VMAAS_ERRORS_SKIP
Expand Down Expand Up @@ -56,11 +64,63 @@ def __init__(
self.remediations_results = remediations_results
self.evaluator_results = evaluator_results
self.payload_tracker = payload_tracker
self.recent_recalc_events: Dict[int, RecalcEvent] = {}

async def init(self):
"""Async constructor"""
await self.evaluator_logic.init()

async def _get_changed_packages(self, recalc_event_id: Optional[int], conn: AsyncConnection) -> Optional[dict]:
"""Get changed packages for a recalc event from cache or fetch it from DB."""
if recalc_event_id is None:
return None
if recalc_event_id not in self.recent_recalc_events:
# Cleanup old recalc events first
deadline = datetime.now() - timedelta(days=30)
for rc_e_id in list(self.recent_recalc_events):
created = self.recent_recalc_events[rc_e_id].created
if created < deadline:
del self.recent_recalc_events[rc_e_id]
LOGGER.info("Unloaded old recalc event id: %s", rc_e_id)
# Fetch single recalc event from DB and put it into cache
async with conn.cursor(row_factory=dict_row) as cur:
await cur.execute(
"""
SELECT created, changed_packages
FROM recalc_event
WHERE id = %s
""",
(recalc_event_id,),
)
row = await cur.fetchone()
if row:
try:
changed_packages = json.loads(row["changed_packages"].decode("utf-8"))
self.recent_recalc_events[recalc_event_id] = RecalcEvent(row["created"], changed_packages)
LOGGER.info("Loaded recalc event id: %s", recalc_event_id)
except json.decoder.JSONDecodeError:
LOGGER.error("Unable to decode JSON: %s", row["changed_packages"])
RECALC_EVENT_ERROR.inc()
return None
else:
LOGGER.error("Recalc event id not found: %s", recalc_event_id)
RECALC_EVENT_ERROR.inc()
return None
return self.recent_recalc_events[recalc_event_id].changed_packages

async def _needs_recalc(self, system_platform: SystemPlatform, changed_packages: dict) -> bool:
"""Returns True if there is at least one package name in system profile which changed in current recalc event."""
if system_platform.vmaas_json:
relevant_changed_repos = set(changed_packages).intersection(system_platform.vmaas_json.get("repository_list", []))
for pkg in system_platform.vmaas_json.get("package_list", []):
pkg_match = PACKAGE_RE.match(pkg)
if pkg_match:
pkg_name = pkg_match.group("pn")
for repo in relevant_changed_repos:
if pkg_name in changed_packages[repo]:
return True
return False

async def _lock_system(self, inventory_id: str, conn: AsyncConnection) -> SystemPlatform:
"""Lock system for update and return its row"""
async with conn.cursor(row_factory=dict_row) as cur:
Expand All @@ -77,9 +137,8 @@ async def _lock_system(self, inventory_id: str, conn: AsyncConnection) -> System
)
row = await cur.fetchone()
if row:
return SystemPlatform(
row["id"], inventory_id, row["rh_account_id"], row["vmaas_json"], row["rule_results"], row["last_evaluation"]
)
vmaas_json = json.loads(row["vmaas_json"]) if row["vmaas_json"] else None
return SystemPlatform(row["id"], inventory_id, row["rh_account_id"], vmaas_json, row["rule_results"], row["last_evaluation"])
return None

@time(EVAL_PART_TIME.labels(part="load_system_vulnerabilities"))
Expand Down Expand Up @@ -227,9 +286,12 @@ async def _compare_sys_vulns(

return to_insert, to_update, to_delete

async def _evaluate_system(self, inventory_id: str, org_id: str, request_timestamp: Optional[datetime]):
async def _evaluate_system(
self, inventory_id: str, org_id: str, request_timestamp: Optional[datetime], recalc_event_id: Optional[int] = None
):
"""Evaluate vulnerabilities for single system, and update DB"""
async with self.db_pool.connection() as conn:
changed_packages = await self._get_changed_packages(recalc_event_id, conn)
async with conn.transaction():
system_platform = await self._lock_system(inventory_id, conn)
if not system_platform:
Expand All @@ -242,8 +304,13 @@ async def _evaluate_system(self, inventory_id: str, org_id: str, request_timesta
)
return
if request_timestamp and system_platform.last_evaluation and request_timestamp < system_platform.last_evaluation:
EVALUATED_EARLIER_SKIP.inc()
LOGGER.info("skipping evaluation, kafka message is older than system was lastly evaluated")
return
if changed_packages and not self._needs_recalc(system_platform, changed_packages):
RECALC_PACKAGES_UNCHANGED_SKIP.inc()
LOGGER.info("skipping re-evaluation, no system packages changed in repos")
return

# start both task asynchronously to speed up
sys_vuln_rows_db, sys_vuln_rows = await asyncio.gather(
Expand Down Expand Up @@ -272,15 +339,18 @@ async def _evaluate_system(self, inventory_id: str, org_id: str, request_timesta

send_remediations_update(self.remediations_results, inventory_id, fixable_sys_vuln_rows)
send_notifications(self.evaluator_results, new_system_vulns, [], [], system_platform.rh_account_id, org_id)
EVAL_PERFORMED_COUNT.inc()

async def evaluate_system(self, inventory_id: str, org_id: str, request_id: str, request_timestamp: datetime):
async def evaluate_system(
self, inventory_id: str, org_id: str, request_id: str, request_timestamp: datetime, recalc_event_id: Optional[int] = None
):
"""Evaluate single system"""
EVAL_COUNT.inc()
msg = {"platform_metadata": {"request_id": request_id}, "host": {"org_id": org_id, "id": inventory_id}}
try:
with EVAL_TIME.time():
LOGGER.info("evaluating system: %s, org_id: %s", inventory_id, org_id)
await self._evaluate_system(inventory_id, org_id, request_timestamp)
await self._evaluate_system(inventory_id, org_id, request_timestamp, recalc_event_id=recalc_event_id)
except EvaluatorException as ex:
LOGGER.error(str(ex))
send_msg_to_payload_tracker(self.payload_tracker, msg, "error", status_msg="evaluation failed", loop=self.loop)
Expand Down

0 comments on commit 2650903

Please sign in to comment.