Skip to content

Commit

Permalink
feat: add recalc admin api
Browse files Browse the repository at this point in the history
RHINENG-15130
  • Loading branch information
psegedy authored and jdobes committed Jan 14, 2025
1 parent a14acd4 commit 4b70af5
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 3 deletions.
3 changes: 3 additions & 0 deletions conf/reevaluation.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
MESSAGE_TOPIC=vulnerability.evaluator.recalc
RE_EVALUATION_KAFKA_BATCH_SIZE=10000
RE_EVALUATION_KAFKA_BATCHES=10
3 changes: 0 additions & 3 deletions conf/vmaas-sync.env
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
POSTGRES_USER=ve_db_user_vmaas_sync
POSTGRES_PASSWORD=ve_db_user_vmaas_sync_pwd
MESSAGE_TOPIC=vulnerability.evaluator.recalc
PROMETHEUS_PORT=8087
ENABLE_RE_EVALUATION=YES
RE_EVALUATION_KAFKA_BATCH_SIZE=10000
RE_EVALUATION_KAFKA_BATCHES=10
DEFAULT_PAGE_SIZE=5000
DEFAULT_REPO_PAGE_SIZE=200
6 changes: 6 additions & 0 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ objects:
value: ${ENV_NAME}
- name: UNLEASH_BOOTSTRAP_FILE
value: ${UNLEASH_BOOTSTRAP_FILE}
- name: MESSAGE_TOPIC
value: vulnerability.evaluator.recalc
- name: RE_EVALUATION_KAFKA_BATCH_SIZE
value: '10000'
- name: RE_EVALUATION_KAFKA_BATCHES
value: '10'
resources:
limits:
cpu: ${CPU_LIMIT_MANAGER_ADMIN}
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ services:
- ./conf/common.env
- ./conf/manager_base.env
- ./conf/database.env
- ./conf/reevaluation.env
ports:
- 8400:8000
depends_on:
Expand Down Expand Up @@ -90,6 +91,7 @@ services:
env_file:
- ./conf/common.env
- ./conf/vmaas-sync.env
- ./conf/reevaluation.env
networks:
- default
- vmaas_default
Expand Down
52 changes: 52 additions & 0 deletions manager.admin.spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,58 @@ paths:
200:
description: Partition was truncated

/recalc/accounts:
put:
summary: Trigger recalc of all systems in specified account org_ids.
description: Trigger recalc of all systems in specified account org_ids. Admin interface, available only to admin users.
operationId: manager.admin_handler.RecalcAccounts.put
x-methodName: recalcAccounts
security:
- ApiKeyAuthAdmin: []
requestBody:
description: Account org_ids to be re-evaluated.
required: true
content:
application/vnd.api+json:
schema:
type: object
properties:
org_ids:
type: array
items:
type: string
description: List of org_ids to be recalculated.
example: ['123456', '654321']
responses:
200:
description: Systems from account are scheduled for recalculation.

/recalc/systems:
put:
summary: Trigger recalc of specified inventory_ids.
description: Trigger recalc of specified inventory_ids. Admin interface, available only to admin users.
operationId: manager.admin_handler.RecalcSystems.put
x-methodName: recalcSystems
security:
- ApiKeyAuthAdmin: []
requestBody:
description: System inventory_ids to be re-evaluated.
required: true
content:
application/vnd.api+json:
schema:
type: object
properties:
inventory_ids:
type: array
items:
type: string
description: List of inventory_ids to be recalculated.
example: ['INV-ID-0000-1234', 'INV-ID-0000-5678']
responses:
200:
description: Systems are scheduled for recalculation.

components:
parameters:
inventory_id:
Expand Down
1 change: 1 addition & 0 deletions manager/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@


init_logging(num_servers=CFG.gunicorn_workers)

# gunicorn expects an object called "application" hence the pylint disable
application = create_app({CFG.default_route: "manager.admin.spec.yaml", # pylint: disable=invalid-name
"": "manager.healthz.spec.yaml"})
87 changes: 87 additions & 0 deletions manager/admin_handler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
"""
Module for admin API endpoints
"""
import asyncio
import json
import subprocess
from datetime import datetime
from datetime import timezone

import requests
from connexion import context
from peewee import DatabaseError

from common import mqueue
from common.config import Config
from common.constants import EvaluatorMessageType
from common.logging import get_logger
from common.peewee_model import DB
from common.peewee_model import Announcement
Expand All @@ -28,6 +33,9 @@
CFG = Config()

LOGGER = get_logger(__name__)
EVENT_LOOP = asyncio.new_event_loop()
EVALUATOR_QUEUE = mqueue.MQWriter(CFG.evaluator_recalc_topic, loop=EVENT_LOOP)
BATCH_SEMAPHORE = asyncio.BoundedSemaphore(CFG.re_evaluation_kafka_batches)


class GetMissingInInventory(GetRequest):
Expand Down Expand Up @@ -463,3 +471,82 @@ def handle_delete(cls, **kwargs):
LOGGER.error("Internal server error: %s", exc)
return "Error", 500
return "Ok", 200


class RecalcBase:

@classmethod
def _create_kafka_msg_task(cls, rows, loop):
msgs = [
{
"type": EvaluatorMessageType.RE_EVALUATE_SYSTEM,
"host": {"id": str(inventory_id), "org_id": org_id},
"timestamp": str(datetime.now(timezone.utc)),
}
for inventory_id, org_id in rows
]
task = EVALUATOR_QUEUE.send_list(msgs, loop=loop)
return task, len(msgs)


class RecalcAccounts(PutRequest, RecalcBase):
"""PUT to /v1/recalc/accounts"""

_endpoint_name = r"/v1/recalc/accounts"

@classmethod
def handle_put(cls, **kwargs):
"""Trigger recalc of all systems in specified account org_ids."""
loop = EVENT_LOOP
total_scheduled = 0
LOGGER.info("kwargs: %s", kwargs)
accounts = kwargs.get("body", {}).get("org_ids", [])
if not accounts:
return "No org_ids provided", 400

with DB.cursor() as cur:
cur.execute("""
SELECT inventory_id, org_id
FROM system_platform sp JOIN rh_account acc ON sp.rh_account_id = acc.id
WHERE org_id = ANY(%s)""", (accounts,))
while True:
loop.run_until_complete(BATCH_SEMAPHORE.acquire())
rows = cur.fetchmany(size=CFG.re_evaluation_kafka_batch_size)
if not rows:
BATCH_SEMAPHORE.release()
break
task, msg_count = cls._create_kafka_msg_task(rows, loop)
total_scheduled += msg_count
task.add_done_callback(lambda x: BATCH_SEMAPHORE.release())
loop.run_until_complete(task)

return f"{total_scheduled} systems scheduled for re-evaluation", 200


class RecalcSystems(PutRequest, RecalcBase):
"""PUT to /v1/recalc/systems"""

_endpoint_name = r"/v1/recalc/systems"

@classmethod
def handle_put(cls, **kwargs):
"""Trigger recalc of specified inventory_ids."""
loop = EVENT_LOOP
total_scheduled = 0
inventory_ids = kwargs.get("body", {}).get("inventory_ids", [])
if not inventory_ids:
return "No inventory_ids provided", 400

with DB.cursor() as cur:
cur.execute("""
SELECT inventory_id, org_id
FROM system_platform sp JOIN rh_account acc ON sp.rh_account_id = acc.id
WHERE inventory_id = ANY(%s::uuid[])""", (inventory_ids,))
rows = cur.fetchall()
if not rows:
return f"{total_scheduled} systems scheduled for re-evaluation", 200

task, total_scheduled = cls._create_kafka_msg_task(rows, loop)
loop.run_until_complete(task)

return f"{total_scheduled} systems scheduled for re-evaluation", 200

0 comments on commit 4b70af5

Please sign in to comment.