Skip to content

Commit

Permalink
feat(vmaas_sync): Recalc system with updated packages in repos
Browse files Browse the repository at this point in the history
RHINENG-10107
  • Loading branch information
Radovan Hanculak authored and jdobes committed Jun 10, 2024
1 parent b5f63e4 commit 8422f75
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 22 deletions.
3 changes: 3 additions & 0 deletions tests/scripts/vmaas_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@
"product": "Red Hat Enterprise Linux Server",
"revision": "2019-07-17T12:53:14.950647+00:00",
"last_change": "2019-07-17T18:00:00+00:00",
"updated_package_names": [],
}
],
"rhel-7-server-rpms": [
Expand All @@ -444,6 +445,7 @@
"product": "Red Hat Enterprise Linux Server",
"revision": "2019-07-17T12:53:14.950647+00:00",
"last_change": "2019-07-17T14:00:00+00:00",
"updated_package_names": [],
}
],
"vmaas-test-rhel8-module": [
Expand All @@ -456,6 +458,7 @@
"product": "VMaaS testing - modularity",
"revision": "2019-07-17T12:53:14.950647+00:00",
"last_change": "2019-07-17T14:00:00+00:00",
"updated_package_names": [],
}
],
},
Expand Down
95 changes: 73 additions & 22 deletions vmaas_sync/vmaas_sync.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""VMaaS sync module."""
import asyncio
import datetime as dt
import json
import re
from datetime import datetime
from datetime import timezone
from typing import Any
Expand Down Expand Up @@ -30,6 +32,7 @@

EVALUATOR_QUEUE = mqueue.MQWriter(CFG.evaluator_recalc_topic)
BATCH_SEMAPHORE = asyncio.BoundedSemaphore(CFG.re_evaluation_kafka_batches)
PACKAGE_RE = re.compile(r"(([0-9]+):)?(?P<pn>[^:]+)-.*[^-:]+-.*")


def _get_last_repobased_eval_tms(cur):
Expand All @@ -52,8 +55,11 @@ def _set_last_repobased_eval_tms(cur, timestamp: dt.datetime):
return ret


def _get_updated_repos(conn) -> list:
"""Get repos updated since last repo-based evaluation"""
def _get_updated_repos(conn) -> dict[str, list[str]]:
"""
Get repos updated since last repo-based evaluation
{ repo_name: [updated_package1, updated_package2, ...], repo_name2: ... }
"""
with conn.cursor() as cur:
modified_since_dt = _get_last_repobased_eval_tms(cur)
# last modified timestamp
Expand All @@ -73,11 +79,23 @@ def _select_repo_based_inventory_ids(cur, repos: list):
"""Select inventory-ids connected with inserted repos, don't fetch it."""
if repos:
cur.execute(
"""select sp.inventory_id, ra.account_number, ra.org_id from system_platform as sp
join rh_account as ra on sp.rh_account_id = ra.id
where sp.when_deleted is null and
sp.id in (select distinct system_id from system_repo where repo_id in
(select id from repo where name in %s)) order by sp.rh_account_id""",
"""SELECT repo.name AS repo_name, sp.inventory_id, ra.org_id, vmaas_json
FROM system_platform AS sp
JOIN rh_account AS ra ON sp.rh_account_id = ra.id
JOIN system_repo sr ON sp.id = sr.system_id
JOIN repo ON repo.id = sr.repo_id
WHERE sp.when_deleted IS NULL
AND sp.id IN (
SELECT DISTINCT system_id
FROM system_repo
WHERE repo_id IN (
SELECT id
FROM repo
WHERE name IN %s
)
)
ORDER BY sp.rh_account_id;
""",
(tuple(repos),),
)
else:
Expand All @@ -87,9 +105,11 @@ def _select_repo_based_inventory_ids(cur, repos: list):
def _select_all_inventory_ids(cur):
"""Select all inventory-ids, don't fetch it."""
cur.execute(
"""select sp.inventory_id, ra.account_number, ra.org_id from system_platform as sp
join rh_account as ra on sp.rh_account_id = ra.id
where sp.when_deleted is null order by sp.rh_account_id"""
"""SELECT NULL AS repo_name, sp.inventory_id, ra.org_id, NULL AS vmaas_json
FROM system_platform AS sp
JOIN rh_account AS ra ON sp.rh_account_id = ra.id
WHERE sp.when_deleted IS NULL
ORDER BY sp.rh_account_id"""
)


Expand Down Expand Up @@ -177,6 +197,14 @@ def _fetch_vmaas_cves() -> Dict[str, Any]:
return result


def _extract_updated_packages(page) -> dict[str, list[str]]:
result = {}
for repo_name, value in page["repository_list"].items():
updated_packages = [name for pkgs in value for name in pkgs["updated_package_names"]]
result[repo_name] = updated_packages
return result


def _fetch_vmaas_repos(modified_since: str) -> Dict[str, Any]:
"""Fetch repositories from VMaaS, based on the modification timestamp stored in DB"""
req = {"repository_list": [".*"], "page": 1, "page_size": CFG.default_page_size, "modified_since": modified_since}
Expand All @@ -185,9 +213,9 @@ def _fetch_vmaas_repos(modified_since: str) -> Dict[str, Any]:
return {}

result = {"latest_repo_change": None}
repos = []
repos = {}
for page in repos_pages:
repos.extend(page["repository_list"].keys())
repos |= _extract_updated_packages(page)
result["last_change"] = page["last_change"]
if page["latest_repo_change"]:
ts = parser.parse(page["latest_repo_change"])
Expand Down Expand Up @@ -281,19 +309,49 @@ def sync_cve_md():
return True


def _create_message_list(rows: list[tuple[str, str, str, str]]):
return [
{
"type": "re-evaluate_system",
"host": {"id": inventory_id, "org_id": org_id},
"timestamp": str(datetime.now(timezone.utc)),
}
for _, inventory_id, org_id, _ in rows
]


def _prepare_messages(updated_repos: dict[str, list[str]], rows: list[tuple[str, str, str, str]], repo_based: bool):
"""
row: (repo_name, inventory_id, org_id, vmaas_json)
"""
if not repo_based:
return _create_message_list(rows)

systems_to_reevaluate = []
for repo_name, inventory_id, org_id, vmaas_json in rows:
system_packages = json.loads(vmaas_json).get("package_list", [])
system_packages = set(map(lambda x: PACKAGE_RE.match(x).group("pn"), system_packages))
# intersection exists (there are some pkgs which changed) => reevaluate system
if set(updated_repos[repo_name]) & system_packages:
systems_to_reevaluate.append((None, inventory_id, org_id, None))

return _create_message_list(systems_to_reevaluate)


def re_evaluate_systems():
"""Schedule re-evaluation for systems in DB."""
with DatabasePoolConnection() as conn:
updated_repos = []
if CFG.enable_repo_based_re_evaluation:
updated_repos = _get_updated_repos(conn)
updated_repos: dict[str, list[str]] = _get_updated_repos(conn)
if not updated_repos:
LOGGER.info("Fetched 0 repos from VMaaS, skipping repo-based re-evaluation")
return

with NamedCursor(conn) as cur:
if CFG.enable_repo_based_re_evaluation:
LOGGER.info("Re-evaluating in repo-based mode")
_select_repo_based_inventory_ids(cur, updated_repos)
_select_repo_based_inventory_ids(cur, updated_repos.keys())
else:
LOGGER.info("Re-evaluating all systems")
_select_all_inventory_ids(cur)
Expand All @@ -305,14 +363,7 @@ def re_evaluate_systems():
if not rows:
BATCH_SEMAPHORE.release()
break
msgs = [
{
"type": "re-evaluate_system",
"host": {"id": inventory_id, "org_id": org_id},
"timestamp": str(datetime.now(timezone.utc)),
}
for inventory_id, _, org_id in rows
]
msgs = _prepare_messages(updated_repos, rows, CFG.enable_repo_based_re_evaluation)
total_scheduled += len(msgs)
future = EVALUATOR_QUEUE.send_list(msgs, loop=loop)
future.add_done_callback(lambda x: BATCH_SEMAPHORE.release())
Expand Down

0 comments on commit 8422f75

Please sign in to comment.