diff --git a/smart_kit/start_points/main_loop_kafka.py b/smart_kit/start_points/main_loop_kafka.py index a0c2b925..492a4e0d 100644 --- a/smart_kit/start_points/main_loop_kafka.py +++ b/smart_kit/start_points/main_loop_kafka.py @@ -273,20 +273,26 @@ async def poll_kafka(self, kafka_key, queues): consumer = self.consumers[kafka_key] log_params = {log_const.KEY_NAME: "timings_polling"} while self.is_work: - with StatsTimer() as poll_timer: - # Max delay between polls configured in consumer.poll_timeout param - mq_message = await consumer.poll() - log_params["kafka_polling"] = poll_timer.msecs - if poll_timer.msecs > self.MAX_LOG_TIME: # TODO align with new async interface - log("Long poll time: %(kafka_polling)s msecs\n", params=log_params, level="WARNING") - if mq_message: - kwargs = {"kafka_key": kafka_key, - "mq_message": mq_message} - not_empty_queues_count = await self.put_to_queue(mq_message, self.do_incoming_handling, kwargs) - log(f"Poll time: %(kafka_polling)s msecs\n, not_empty_queues count: {not_empty_queues_count}.", - params=log_params, level="INFO") - else: - await asyncio.sleep(self.no_kafka_messages_poll_time) # callbacks can work here + try: + with StatsTimer() as poll_timer: + # Max delay between polls configured in consumer.poll_timeout param + mq_message = await consumer.poll() + log_params["kafka_polling"] = poll_timer.msecs + if poll_timer.msecs > self.MAX_LOG_TIME: # TODO align with new async interface + log("Long poll time: %(kafka_polling)s msecs\n", params=log_params, level="WARNING") + if mq_message: + kwargs = {"kafka_key": kafka_key, + "mq_message": mq_message} + not_empty_queues_count = await self.put_to_queue(mq_message, self.do_incoming_handling, kwargs) + log(f"Poll time: %(kafka_polling)s msecs\n, not_empty_queues count: {not_empty_queues_count}.", + params=log_params, level="INFO") + else: + await asyncio.sleep(self.no_kafka_messages_poll_time) # callbacks can work here + except Exception: + log("%(class_name)s poll_kafka error. Kafka key %(kafka_key)s", + params={log_const.KEY_NAME: "poll_kafka_error", + "kafka_key": kafka_key}, + level="ERROR", exc_info=True) log("Stop poll_kafka consumer.")