From 3725d662fe4d2f6aab9c065d29fe1b38d2801aa9 Mon Sep 17 00:00:00 2001 From: Makar Shevchenko Date: Mon, 20 Nov 2023 01:37:37 +0300 Subject: [PATCH] DPNLPF-2185: add try except for poll_kafka --- smart_kit/start_points/main_loop_kafka.py | 34 +++++++++++++---------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/smart_kit/start_points/main_loop_kafka.py b/smart_kit/start_points/main_loop_kafka.py index a0c2b925..492a4e0d 100644 --- a/smart_kit/start_points/main_loop_kafka.py +++ b/smart_kit/start_points/main_loop_kafka.py @@ -273,20 +273,26 @@ async def poll_kafka(self, kafka_key, queues): consumer = self.consumers[kafka_key] log_params = {log_const.KEY_NAME: "timings_polling"} while self.is_work: - with StatsTimer() as poll_timer: - # Max delay between polls configured in consumer.poll_timeout param - mq_message = await consumer.poll() - log_params["kafka_polling"] = poll_timer.msecs - if poll_timer.msecs > self.MAX_LOG_TIME: # TODO align with new async interface - log("Long poll time: %(kafka_polling)s msecs\n", params=log_params, level="WARNING") - if mq_message: - kwargs = {"kafka_key": kafka_key, - "mq_message": mq_message} - not_empty_queues_count = await self.put_to_queue(mq_message, self.do_incoming_handling, kwargs) - log(f"Poll time: %(kafka_polling)s msecs\n, not_empty_queues count: {not_empty_queues_count}.", - params=log_params, level="INFO") - else: - await asyncio.sleep(self.no_kafka_messages_poll_time) # callbacks can work here + try: + with StatsTimer() as poll_timer: + # Max delay between polls configured in consumer.poll_timeout param + mq_message = await consumer.poll() + log_params["kafka_polling"] = poll_timer.msecs + if poll_timer.msecs > self.MAX_LOG_TIME: # TODO align with new async interface + log("Long poll time: %(kafka_polling)s msecs\n", params=log_params, level="WARNING") + if mq_message: + kwargs = {"kafka_key": kafka_key, + "mq_message": mq_message} + not_empty_queues_count = await self.put_to_queue(mq_message, self.do_incoming_handling, kwargs) + log(f"Poll time: %(kafka_polling)s msecs\n, not_empty_queues count: {not_empty_queues_count}.", + params=log_params, level="INFO") + else: + await asyncio.sleep(self.no_kafka_messages_poll_time) # callbacks can work here + except Exception: + log("%(class_name)s poll_kafka error. Kafka key %(kafka_key)s", + params={log_const.KEY_NAME: "poll_kafka_error", + "kafka_key": kafka_key}, + level="ERROR", exc_info=True) log("Stop poll_kafka consumer.")