From 749c43f5e9df2bea9c91683827ad139c92fdd00d Mon Sep 17 00:00:00 2001 From: ivan Date: Fri, 5 Jul 2024 15:44:52 +0500 Subject: [PATCH] Monitoring added into Http Loops --- smart_kit/start_points/main_loop_async_http.py | 8 +++++++- smart_kit/start_points/main_loop_http.py | 3 +++ smart_kit/start_points/main_loop_kafka.py | 3 +-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/smart_kit/start_points/main_loop_async_http.py b/smart_kit/start_points/main_loop_async_http.py index 0d92ed50..915fbceb 100644 --- a/smart_kit/start_points/main_loop_async_http.py +++ b/smart_kit/start_points/main_loop_async_http.py @@ -127,15 +127,18 @@ async def handle_message(self, message: SmartAppFromMessage) -> typing.Tuple[int code = 204 log(f"OUTGOING DATA: {answer.masked_value} with code: {code}", params={log_const.KEY_NAME: "outgoing_policy_message"}, user=user) + monitoring.counter_outgoing(self.app_name, answer.command.name, answer.command, user) return code, "NO CONTENT", answer answer_message = SmartAppToMessage( - answer, message, request=None, validators=self.to_msg_validators, masking_fields=self.masking_fields) + answer, message, request=None, validators=self.to_msg_validators, masking_fields=self.masking_fields + ) if answer_message.validate(): code = 200 log_answer = str(answer_message.masked_value).replace("%", "%%") log(f"OUTGOING DATA: {log_answer} with code: {code}", params={log_const.KEY_NAME: "outgoing_policy_message"}, user=user) + monitoring.counter_outgoing(self.app_name, answer.command.name, answer.command, user) return code, "OK", answer_message else: code = 500 @@ -143,6 +146,7 @@ async def handle_message(self, message: SmartAppFromMessage) -> typing.Tuple[int self.BAD_ANSWER_COMMAND, message=message, request=None, masking_fields=self.masking_fields) log(f"OUTGOING DATA: {answer.masked_value} with code: {code}", params={log_const.KEY_NAME: "outgoing_policy_message"}, user=user) + monitoring.counter_outgoing(self.app_name, answer.command.name, answer.command, user) return code, "BAD ANSWER", answer async def process_message(self, message: SmartAppFromMessage, *args, **kwargs): @@ -153,6 +157,7 @@ async def process_message(self, message: SmartAppFromMessage, *args, **kwargs): with StatsTimer() as load_timer: user = await self.load_user(db_uid, message) + monitoring.sampling_load_time(self.app_name, load_timer.secs) stats += "Loading time: {} msecs\n".format(load_timer.msecs) with StatsTimer() as script_timer: commands = await self.model.answer(message, user) @@ -164,6 +169,7 @@ async def process_message(self, message: SmartAppFromMessage, *args, **kwargs): stats += "Script time: {} msecs\n".format(script_timer.msecs) with StatsTimer() as save_timer: await self.save_user(db_uid, user, message) + monitoring.sampling_save_time(self.app_name, save_timer.secs) stats += "Saving time: {} msecs\n".format(save_timer.msecs) log(stats, params={log_const.KEY_NAME: "timings"}) await self.postprocessor.postprocess(user, message) diff --git a/smart_kit/start_points/main_loop_http.py b/smart_kit/start_points/main_loop_http.py index 6d677b83..76a319b3 100644 --- a/smart_kit/start_points/main_loop_http.py +++ b/smart_kit/start_points/main_loop_http.py @@ -9,6 +9,7 @@ from core.configs.global_constants import CALLBACK_ID_HEADER from core.logging.logger_utils import log from core.message.from_message import SmartAppFromMessage, basic_error_message +from core.monitoring.monitoring import monitoring from core.utils.stats_timer import StatsTimer from smart_kit.compatibility.commands import combine_commands from smart_kit.message.smartapp_to_message import SmartAppToMessage @@ -70,6 +71,7 @@ def process_message(self, message: SmartAppFromMessage, *args, **kwargs): db_uid = message.db_uid with StatsTimer() as load_timer: user = self.loop.run_until_complete(self.load_user(db_uid, message)) + monitoring.sampling_load_time(self.app_name, load_timer.secs) stats += "Loading time: {} msecs\n".format(load_timer.msecs) with StatsTimer() as script_timer: commands = asyncio.get_event_loop().run_until_complete(self.model.answer(message, user)) @@ -81,6 +83,7 @@ def process_message(self, message: SmartAppFromMessage, *args, **kwargs): stats += "Script time: {} msecs\n".format(script_timer.msecs) with StatsTimer() as save_timer: self.loop.run_until_complete(self.save_user(db_uid, user, message)) + monitoring.sampling_save_time(self.app_name, save_timer.secs) stats += "Saving time: {} msecs\n".format(save_timer.msecs) log(stats, user=user, params={log_const.KEY_NAME: "timings"}) self.loop.run_until_complete(self.postprocessor.postprocess(user, message)) diff --git a/smart_kit/start_points/main_loop_kafka.py b/smart_kit/start_points/main_loop_kafka.py index 0d3cf87b..44d1bafe 100644 --- a/smart_kit/start_points/main_loop_kafka.py +++ b/smart_kit/start_points/main_loop_kafka.py @@ -356,8 +356,6 @@ def _generate_answers(self, user, commands, message, **kwargs): else: answers.append(SmartAppToMessage(self.BAD_ANSWER_COMMAND, message=message, request=request)) - monitoring.counter_outgoing(self.app_name, command.name, answer, user) - return answers def _get_timeout_from_message(self, orig_message_raw: Dict, callback_id, headers): @@ -603,6 +601,7 @@ async def _send_request(self, user: BaseUser, answer: SmartAppToMessage, mq_mess request_params["mq_message"] = mq_message request_params["payload"] = answer.value request_params["masked_value"] = answer.masked_value + monitoring.counter_outgoing(self.app_name, answer.command.name, answer.command, user) await request.run(answer.value.encode(), request_params) self._log_request(user, request, answer, mq_message)