forked from RedHatInsights/insights-host-inventory
-
Notifications
You must be signed in to change notification settings - Fork 0
/
inv_mq_service.py
45 lines (33 loc) · 1.33 KB
/
inv_mq_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from functools import partial
from kafka import KafkaConsumer
from prometheus_client import start_http_server
from app import create_app
from app.environment import RuntimeEnvironment
from app.logging import get_logger
from app.queue.event_producer import EventProducer
from app.queue.queue import event_loop
from app.queue.queue import handle_message
from lib.handlers import register_shutdown
from lib.handlers import ShutdownHandler
logger = get_logger("mq_service")
def main():
application = create_app(RuntimeEnvironment.SERVICE)
start_http_server(9126)
config = application.config["INVENTORY_CONFIG"]
consumer = KafkaConsumer(
config.host_ingress_topic,
group_id=config.host_ingress_consumer_group,
bootstrap_servers=config.bootstrap_servers,
api_version=(0, 10, 1),
value_deserializer=lambda m: m.decode(),
**config.kafka_consumer,
)
consumer_shutdown = partial(consumer.close, autocommit=True)
register_shutdown(consumer_shutdown, "Closing consumer")
event_producer = EventProducer(config)
register_shutdown(event_producer.close, "Closing producer")
shutdown_handler = ShutdownHandler()
shutdown_handler.register()
event_loop(consumer, application, event_producer, handle_message, shutdown_handler.shut_down)
if __name__ == "__main__":
main()