Skip to content

Commit

Permalink
fix(vmaas_sync): Revert recalc system with updated packages in repos
Browse files Browse the repository at this point in the history
Additional work needs to be done, reverting the change to not block release to prod.

RHINENG-10107
RHINENG-10893

This reverts commit a776d33.
This reverts commit 97e9354.
This reverts commit d8cef5a.
This reverts commit 8422f75.
  • Loading branch information
jdobes committed Jun 27, 2024
1 parent 7ef119e commit a469448
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 83 deletions.
3 changes: 0 additions & 3 deletions tests/scripts/vmaas_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,6 @@
"product": "Red Hat Enterprise Linux Server",
"revision": "2019-07-17T12:53:14.950647+00:00",
"last_change": "2019-07-17T18:00:00+00:00",
"updated_package_names": [],
}
],
"rhel-7-server-rpms": [
Expand All @@ -445,7 +444,6 @@
"product": "Red Hat Enterprise Linux Server",
"revision": "2019-07-17T12:53:14.950647+00:00",
"last_change": "2019-07-17T14:00:00+00:00",
"updated_package_names": [],
}
],
"vmaas-test-rhel8-module": [
Expand All @@ -458,7 +456,6 @@
"product": "VMaaS testing - modularity",
"revision": "2019-07-17T12:53:14.950647+00:00",
"last_change": "2019-07-17T14:00:00+00:00",
"updated_package_names": [],
}
],
},
Expand Down
103 changes: 23 additions & 80 deletions vmaas_sync/vmaas_sync.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""VMaaS sync module."""
import asyncio
import datetime as dt
import json
import re
from datetime import datetime
from datetime import timezone
from typing import Any
Expand Down Expand Up @@ -32,7 +30,6 @@

EVALUATOR_QUEUE = mqueue.MQWriter(CFG.evaluator_recalc_topic)
BATCH_SEMAPHORE = asyncio.BoundedSemaphore(CFG.re_evaluation_kafka_batches)
PACKAGE_RE = re.compile(r"(([0-9]+):)?(?P<pn>[^:]+)-.*[^-:]+-.*")


def _get_last_repobased_eval_tms(cur):
Expand All @@ -55,11 +52,8 @@ def _set_last_repobased_eval_tms(cur, timestamp: dt.datetime):
return ret


def _get_updated_repos(conn) -> dict[str, list[str]]:
"""
Get repos updated since last repo-based evaluation
{ repo_name: [updated_package1, updated_package2, ...], repo_name2: ... }
"""
def _get_updated_repos(conn) -> list:
"""Get repos updated since last repo-based evaluation"""
with conn.cursor() as cur:
modified_since_dt = _get_last_repobased_eval_tms(cur)
# last modified timestamp
Expand All @@ -79,23 +73,11 @@ def _select_repo_based_inventory_ids(cur, repos: list):
"""Select inventory-ids connected with inserted repos, don't fetch it."""
if repos:
cur.execute(
"""SELECT repo.name AS repo_name, sp.inventory_id, ra.org_id, vmaas_json
FROM system_platform AS sp
JOIN rh_account AS ra ON sp.rh_account_id = ra.id
JOIN system_repo sr ON sp.id = sr.system_id
JOIN repo ON repo.id = sr.repo_id
WHERE sp.when_deleted IS NULL
AND sp.id IN (
SELECT DISTINCT system_id
FROM system_repo
WHERE repo_id IN (
SELECT id
FROM repo
WHERE name IN %s
)
)
ORDER BY sp.rh_account_id;
""",
"""select sp.inventory_id, ra.account_number, ra.org_id from system_platform as sp
join rh_account as ra on sp.rh_account_id = ra.id
where sp.when_deleted is null and
sp.id in (select distinct system_id from system_repo where repo_id in
(select id from repo where name in %s)) order by sp.rh_account_id""",
(tuple(repos),),
)
else:
Expand All @@ -105,11 +87,9 @@ def _select_repo_based_inventory_ids(cur, repos: list):
def _select_all_inventory_ids(cur):
"""Select all inventory-ids, don't fetch it."""
cur.execute(
"""SELECT NULL AS repo_name, sp.inventory_id, ra.org_id, NULL AS vmaas_json
FROM system_platform AS sp
JOIN rh_account AS ra ON sp.rh_account_id = ra.id
WHERE sp.when_deleted IS NULL
ORDER BY sp.rh_account_id"""
"""select sp.inventory_id, ra.account_number, ra.org_id from system_platform as sp
join rh_account as ra on sp.rh_account_id = ra.id
where sp.when_deleted is null order by sp.rh_account_id"""
)


Expand Down Expand Up @@ -197,31 +177,17 @@ def _fetch_vmaas_cves() -> Dict[str, Any]:
return result


def _extract_updated_packages(page) -> dict[str, list[str]]:
result = {}
for repo_name, value in page["repository_list"].items():
updated_packages = [name for pkgs in value for name in pkgs.get("updated_package_names", [])]
result[repo_name] = updated_packages
return result


def _fetch_vmaas_repos(modified_since: str) -> Dict[str, Any]:
"""Fetch repositories from VMaaS, based on the modification timestamp stored in DB"""
req = {
"repository_list": [".*"],
"page": 1,
"page_size": CFG.default_page_size,
"modified_since": modified_since,
"show_packages": True,
}
req = {"repository_list": [".*"], "page": 1, "page_size": CFG.default_page_size, "modified_since": modified_since}
success, repos_pages = _fetch_vmaas_pages(VMAAS_REPOS_ENDPOINT, req, "POST")
if not success:
return {}

result = {"latest_repo_change": None}
repos = {}
repos = []
for page in repos_pages:
repos |= _extract_updated_packages(page)
repos.extend(page["repository_list"].keys())
result["last_change"] = page["last_change"]
if page["latest_repo_change"]:
ts = parser.parse(page["latest_repo_change"])
Expand Down Expand Up @@ -315,49 +281,19 @@ def sync_cve_md():
return True


def _create_message_list(rows: list[tuple[str, str, str, str]]):
return [
{
"type": "re-evaluate_system",
"host": {"id": inventory_id, "org_id": org_id},
"timestamp": str(datetime.now(timezone.utc)),
}
for _, inventory_id, org_id, _ in rows
]


def _prepare_messages(updated_repos: dict[str, list[str]], rows: list[tuple[str, str, str, str]], repo_based: bool):
"""
row: (repo_name, inventory_id, org_id, vmaas_json)
"""
if not repo_based:
return _create_message_list(rows)

systems_to_reevaluate = []
for repo_name, inventory_id, org_id, vmaas_json in rows:
system_packages = json.loads(vmaas_json).get("package_list", [])
system_packages = set(map(lambda x: PACKAGE_RE.match(x).group("pn"), system_packages))
# intersection exists (there are some pkgs which changed) => reevaluate system
if set(updated_repos.get(repo_name, set())) & system_packages:
systems_to_reevaluate.append((None, inventory_id, org_id, None))

return _create_message_list(systems_to_reevaluate)


def re_evaluate_systems():
"""Schedule re-evaluation for systems in DB."""
with DatabasePoolConnection() as conn:
updated_repos = []
if CFG.enable_repo_based_re_evaluation:
updated_repos: dict[str, list[str]] = _get_updated_repos(conn)
updated_repos = _get_updated_repos(conn)
if not updated_repos:
LOGGER.info("Fetched 0 repos from VMaaS, skipping repo-based re-evaluation")
return

with NamedCursor(conn) as cur:
if CFG.enable_repo_based_re_evaluation:
LOGGER.info("Re-evaluating in repo-based mode")
_select_repo_based_inventory_ids(cur, updated_repos.keys())
_select_repo_based_inventory_ids(cur, updated_repos)
else:
LOGGER.info("Re-evaluating all systems")
_select_all_inventory_ids(cur)
Expand All @@ -369,7 +305,14 @@ def re_evaluate_systems():
if not rows:
BATCH_SEMAPHORE.release()
break
msgs = _prepare_messages(updated_repos, rows, CFG.enable_repo_based_re_evaluation)
msgs = [
{
"type": "re-evaluate_system",
"host": {"id": inventory_id, "org_id": org_id},
"timestamp": str(datetime.now(timezone.utc)),
}
for inventory_id, _, org_id in rows
]
total_scheduled += len(msgs)
future = EVALUATOR_QUEUE.send_list(msgs, loop=loop)
future.add_done_callback(lambda x: BATCH_SEMAPHORE.release())
Expand Down

0 comments on commit a469448

Please sign in to comment.