From 2741ed59defae494d3c9913e93d32255b5dd91bf Mon Sep 17 00:00:00 2001 From: Dmitriy Ivanko Date: Thu, 19 Dec 2024 17:11:30 +0300 Subject: [PATCH] Implement stopwatch feature --- hammett/conf/global_settings.py | 4 ++ hammett/core/application.py | 25 +++++++++- hammett/core/constants.py | 9 ++++ hammett/stopwatch/__init__.py | 10 ++++ hammett/stopwatch/collector.py | 84 +++++++++++++++++++++++++++++++++ hammett/stopwatch/event_loop.py | 50 ++++++++++++++++++++ hammett/stopwatch/stats.py | 66 ++++++++++++++++++++++++++ 7 files changed, 247 insertions(+), 1 deletion(-) create mode 100644 hammett/stopwatch/__init__.py create mode 100644 hammett/stopwatch/collector.py create mode 100644 hammett/stopwatch/event_loop.py create mode 100644 hammett/stopwatch/stats.py diff --git a/hammett/conf/global_settings.py b/hammett/conf/global_settings.py index dfa2ff7..0ba93e7 100644 --- a/hammett/conf/global_settings.py +++ b/hammett/conf/global_settings.py @@ -63,3 +63,7 @@ WEBHOOK_URL_PATH = '' WEBHOOK_URL = '' + +HANDLERS_STOPWATCH = False + +STOPWATCH_STATS_PROCESSOR = 'hammett.stopwatch.stats.PrintStatsProcessor' diff --git a/hammett/core/application.py b/hammett/core/application.py index ef22c7f..217efe7 100644 --- a/hammett/core/application.py +++ b/hammett/core/application.py @@ -1,5 +1,6 @@ """The module contains the implementation of the high-level application class.""" +import asyncio from typing import TYPE_CHECKING, Any from telegram import Update @@ -21,6 +22,7 @@ from hammett.core.handlers import calc_checksum, log_unregistered_handler from hammett.core.permission import apply_permission_to from hammett.error_handler import default_error_handler +from hammett.stopwatch.event_loop import StopWatchEventLoopPolicy from hammett.types import HandlerAlias, HandlerType, JobConfig from hammett.utils.log import configure_logging @@ -29,7 +31,7 @@ from telegram.ext import BasePersistence from telegram.ext._applicationbuilder import ApplicationBuilder - from telegram.ext._utils.types import BD, CD, UD + from telegram.ext._utils.types import BD, BT, CCT, CD, JQ, UD from typing_extensions import Self from hammett.core.mixins import StartMixin @@ -196,6 +198,12 @@ def _register_handlers(self: 'Self', state: 'State', screens: 'Iterable[type[Scr setattr(screen, name, apply_permission_to(handler)) instance_handler = getattr(screen(), name) + + from hammett.conf import settings + from hammett.stopwatch import collect_handler_statistics + if settings.HANDLERS_STOPWATCH: + instance_handler = collect_handler_statistics(instance_handler) + handler_object = self._get_handler_object( instance_handler, handler_type, @@ -227,12 +235,27 @@ def _setup(self: 'Self') -> None: from hammett.conf import settings configure_logging(settings.LOGGING) + if settings.HANDLERS_STOPWATCH: + asyncio.set_event_loop_policy(StopWatchEventLoopPolicy()) + + @staticmethod + async def post_stop(_self: 'NativeApplication[BT, CCT, UD, CD, BD, JQ]') -> None: + """Run after the `NativeApplication` is stopped.""" + from hammett.conf import settings + from hammett.stopwatch.collector import get_stopwatch_stats_processor + + if settings.HANDLERS_STOPWATCH: + processor = get_stopwatch_stats_processor() + await processor.on_exit() + def provide_application_builder(self: 'Self') -> 'ApplicationBuilder': # type: ignore[type-arg] """Return a native application builder.""" from hammett.conf import settings return NativeApplication.builder().read_timeout( settings.APPLICATION_BUILDER_READ_TIMEOUT, + ).post_stop( + self.post_stop, ).token( settings.TOKEN, ) diff --git a/hammett/core/constants.py b/hammett/core/constants.py index a740a62..8cfb708 100644 --- a/hammett/core/constants.py +++ b/hammett/core/constants.py @@ -60,3 +60,12 @@ class LatestMessage(TypedDict): chat_id: int message_id: int hide_keyboard: bool + + +class HandlerStats(TypedDict): + """The class represents information about the handler stats.""" + + cpu_time: float + select_time: float + other_io_time: float + real_time: float diff --git a/hammett/stopwatch/__init__.py b/hammett/stopwatch/__init__.py new file mode 100644 index 0000000..4747208 --- /dev/null +++ b/hammett/stopwatch/__init__.py @@ -0,0 +1,10 @@ +"""The package contains functions to collect handler statistics and further represent them.""" + +__all__ = ( + 'StopWatchEventLoopPolicy', + 'StopWatchSelector', + 'collect_handler_statistics', +) + +from hammett.stopwatch.collector import collect_handler_statistics +from hammett.stopwatch.event_loop import StopWatchEventLoopPolicy, StopWatchSelector diff --git a/hammett/stopwatch/collector.py b/hammett/stopwatch/collector.py new file mode 100644 index 0000000..de93e28 --- /dev/null +++ b/hammett/stopwatch/collector.py @@ -0,0 +1,84 @@ +"""The module contains a decorator that wraps handlers to track time.""" + +import asyncio +import functools +import logging +import time +from typing import TYPE_CHECKING + +from hammett.conf import settings +from hammett.core.exceptions import ImproperlyConfigured +from hammett.stopwatch.event_loop import StopWatchSelector +from hammett.utils.handler import wraps_handler +from hammett.utils.module_loading import import_string + +if TYPE_CHECKING: + from typing import Any + + from hammett.core.constants import HandlerStats + from hammett.stopwatch.stats import BaseStatsProcessor + from hammett.types import Handler + +LOGGER = logging.getLogger(__name__) + + +@functools.cache +def get_stopwatch_stats_processor() -> 'type[BaseStatsProcessor]': + """Import a stopwatch stats processor by the `STOPWATCH_STATS_PROCESSOR` of the module path + and returns it. + """ + try: + processor = import_string(settings.STOPWATCH_STATS_PROCESSOR) + except ImportError as exc: + msg = ( + f'The module could not be imported: {settings.STOPWATCH_STATS_PROCESSOR}. ' + f'Check your STOPWATCH_STATS_PROCESSOR setting.' + ) + raise ImproperlyConfigured(msg) from exc + else: + return processor + + +def collect_handler_statistics(func: 'Handler') -> 'Handler': + """Collect the statistics of the handler and passes them to the `STOPWATCH_STATS`.""" + stats_processor = get_stopwatch_stats_processor() + + @wraps_handler(func) + async def wrapper(*args: 'Any', **kwargs: 'Any') -> 'Any': + try: + event_loop_selector = getattr(asyncio.get_event_loop(), '_selector') # noqa: B009 + except AttributeError: + event_loop_selector = None + + if event_loop_selector is None or not isinstance(event_loop_selector, StopWatchSelector): + is_possible_get_select_time = False + LOGGER.error( + 'Unable to get select time statistic. Perhaps you have changed the ' + 'event loop policy?', + ) + else: + is_possible_get_select_time = True + event_loop_selector.reset_select_time() + + real_time = time.time() + process_time = time.process_time() + + response = await func(*args, **kwargs) + + real_time = time.time() - real_time + cpu_time = time.process_time() - process_time + + select_time = event_loop_selector.select_time if is_possible_get_select_time else 0.0 + other_io_time = max(0.0, real_time - cpu_time - select_time) + + stats: HandlerStats = { + 'cpu_time': cpu_time, + 'select_time': select_time, + 'other_io_time': other_io_time, + 'real_time': real_time, + } + await stats_processor(func, stats).process() + + return response + + return wrapper diff --git a/hammett/stopwatch/event_loop.py b/hammett/stopwatch/event_loop.py new file mode 100644 index 0000000..fb6fa24 --- /dev/null +++ b/hammett/stopwatch/event_loop.py @@ -0,0 +1,50 @@ +"""The module contains the implementation of a modified `EventLoopPolicy` that can track the time +of select. +""" + +import asyncio +import selectors +import time +from typing import TYPE_CHECKING, cast + +if TYPE_CHECKING: + from asyncio.selector_events import BaseSelectorEventLoop + from selectors import SelectorKey + from typing import Any + + +class StopWatchSelector(selectors.DefaultSelector): + """The class implement the tracking of the time of retrieving the task from the event loop.""" + + def __init__(self, *args: 'Any', **kwargs: 'Any') -> None: + """Initialize `select_time` attribute.""" + self.select_time = 0.0 + + super().__init__(*args, **kwargs) + + def reset_select_time(self) -> None: + """Reset select_time value.""" + self.select_time = 0.0 + + def select(self, timeout: float | None = None) -> 'list[tuple[SelectorKey, int]]': + """Track select time if timeout is greater than zero.""" + if timeout is not None and timeout <= 0: + return super().select(timeout) + + start = time.time() + try: + return super().select(timeout) + finally: + self.select_time += time.time() - start + + +class StopWatchEventLoopPolicy(asyncio.DefaultEventLoopPolicy): + """The class implements the `StopWatchSelector` passing in the event loop.""" + + def new_event_loop(self) -> 'BaseSelectorEventLoop': + """Create a new event loop and pass `StopWatchSelector`.""" + selector = StopWatchSelector() + return cast( + 'BaseSelectorEventLoop', + self._loop_factory(selector=selector), # type: ignore[attr-defined] + ) diff --git a/hammett/stopwatch/stats.py b/hammett/stopwatch/stats.py new file mode 100644 index 0000000..83b1217 --- /dev/null +++ b/hammett/stopwatch/stats.py @@ -0,0 +1,66 @@ +"""The module contains classes that implement the processing of handler statistics.""" + +import json +from pathlib import Path +from typing import TYPE_CHECKING + +from hammett.core.handlers import get_handler_name + +if TYPE_CHECKING: + from hammett.core.constants import HandlerStats + from hammett.types import Handler + + +class BaseStatsProcessor: + """Base class for implementing handler statistics processing.""" + + def __init__(self, handler: 'Handler', stats: 'HandlerStats') -> None: + """Save `handler` and `stats` values to the relevant attributes.""" + self.handler = handler + self.stats = stats + + @classmethod + async def on_exit(cls) -> None: + """Run after application is stopped.""" + + async def process(self) -> None: + """Run after handler is called and statistics are retrieved.""" + raise NotImplementedError + + +class PrintStatsProcessor(BaseStatsProcessor): + """The class implements the output of the handler stats via print.""" + + async def process(self) -> None: + """Print handler stats.""" + msg = ( + f'{get_handler_name(self.handler)}:\n' + f' CPU time: {self.stats["cpu_time"]}\n' + f' Select time: {self.stats["select_time"]}\n' + f' Other IO time: {self.stats["other_io_time"]}\n' + f' Real time: {self.stats["real_time"]}\n' + ) + print(msg) # noqa: T201 + + +class JsonStatsProcessor(BaseStatsProcessor): + """The class collects the stats of all handlers and dumps them into a json file.""" + + _all_stats: 'dict[str, list[HandlerStats]]' = {} + + async def process(self) -> None: + """Add handler stats to all stats dict.""" + handler_name = get_handler_name(self.handler) + try: + self._all_stats[handler_name] + except KeyError: + self._all_stats[handler_name] = [] + + self._all_stats[handler_name].append(self.stats) + + @classmethod + async def on_exit(cls) -> None: + """Dump the accumulated statistics to a file.""" + # we can't use async writing to a file because it won't execute in time + with Path('handler_stats.json').open('w', encoding='utf-8') as f: # noqa: ASYNC230 + json.dump(cls._all_stats, f, ensure_ascii=False, indent=4)