Skip to content

Commit

Permalink
Merge pull request #169 from salute-developers/fix/DPNLPF-2185_try_ex…
Browse files Browse the repository at this point in the history
…cept_poll_kafka

DPNLPF-2185: add try except for poll_kafka
  • Loading branch information
SyrexMinus authored Nov 19, 2023
2 parents 7800aed + 3725d66 commit 45b368e
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions smart_kit/start_points/main_loop_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,20 +273,26 @@ async def poll_kafka(self, kafka_key, queues):
consumer = self.consumers[kafka_key]
log_params = {log_const.KEY_NAME: "timings_polling"}
while self.is_work:
with StatsTimer() as poll_timer:
# Max delay between polls configured in consumer.poll_timeout param
mq_message = await consumer.poll()
log_params["kafka_polling"] = poll_timer.msecs
if poll_timer.msecs > self.MAX_LOG_TIME: # TODO align with new async interface
log("Long poll time: %(kafka_polling)s msecs\n", params=log_params, level="WARNING")
if mq_message:
kwargs = {"kafka_key": kafka_key,
"mq_message": mq_message}
not_empty_queues_count = await self.put_to_queue(mq_message, self.do_incoming_handling, kwargs)
log(f"Poll time: %(kafka_polling)s msecs\n, not_empty_queues count: {not_empty_queues_count}.",
params=log_params, level="INFO")
else:
await asyncio.sleep(self.no_kafka_messages_poll_time) # callbacks can work here
try:
with StatsTimer() as poll_timer:
# Max delay between polls configured in consumer.poll_timeout param
mq_message = await consumer.poll()
log_params["kafka_polling"] = poll_timer.msecs
if poll_timer.msecs > self.MAX_LOG_TIME: # TODO align with new async interface
log("Long poll time: %(kafka_polling)s msecs\n", params=log_params, level="WARNING")
if mq_message:
kwargs = {"kafka_key": kafka_key,
"mq_message": mq_message}
not_empty_queues_count = await self.put_to_queue(mq_message, self.do_incoming_handling, kwargs)
log(f"Poll time: %(kafka_polling)s msecs\n, not_empty_queues count: {not_empty_queues_count}.",
params=log_params, level="INFO")
else:
await asyncio.sleep(self.no_kafka_messages_poll_time) # callbacks can work here
except Exception:
log("%(class_name)s poll_kafka error. Kafka key %(kafka_key)s",
params={log_const.KEY_NAME: "poll_kafka_error",
"kafka_key": kafka_key},
level="ERROR", exc_info=True)

log("Stop poll_kafka consumer.")

Expand Down

0 comments on commit 45b368e

Please sign in to comment.