Skip to content

Commit

Permalink
Implement stopwatch feature
Browse files Browse the repository at this point in the history
  • Loading branch information
Themanwhosmellslikesugar committed Dec 19, 2024
1 parent 5055357 commit 3d73d73
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 1 deletion.
4 changes: 4 additions & 0 deletions hammett/conf/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@
WEBHOOK_URL_PATH = ''

WEBHOOK_URL = ''

HANDLERS_STOPWATCH = False

STOPWATCH_STATS_PROCESSOR = 'hammett.stopwatch.stats.PrintStatsProcessor'
25 changes: 24 additions & 1 deletion hammett/core/application.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""The module contains the implementation of the high-level application class."""

import asyncio
from typing import TYPE_CHECKING, Any

from telegram import Update
Expand Down Expand Up @@ -29,7 +30,7 @@

from telegram.ext import BasePersistence
from telegram.ext._applicationbuilder import ApplicationBuilder
from telegram.ext._utils.types import BD, CD, UD
from telegram.ext._utils.types import BD, BT, CCT, CD, JQ, UD
from typing_extensions import Self

from hammett.core.mixins import StartMixin
Expand Down Expand Up @@ -196,6 +197,12 @@ def _register_handlers(self: 'Self', state: 'State', screens: 'Iterable[type[Scr

setattr(screen, name, apply_permission_to(handler))
instance_handler = getattr(screen(), name)

from hammett.conf import settings
from hammett.stopwatch import collect_handler_statistics
if settings.HANDLERS_STOPWATCH:
instance_handler = collect_handler_statistics(instance_handler)

handler_object = self._get_handler_object(
instance_handler,
handler_type,
Expand Down Expand Up @@ -225,14 +232,30 @@ def _set_default_value_to_native_states(self: 'Self', state: 'State') -> None:
def _setup(self: 'Self') -> None:
"""Configure logging."""
from hammett.conf import settings
from hammett.stopwatch.event_loop import StopWatchEventLoopPolicy
configure_logging(settings.LOGGING)

if settings.HANDLERS_STOPWATCH:
asyncio.set_event_loop_policy(StopWatchEventLoopPolicy())

@staticmethod
async def post_stop(_self: 'NativeApplication[BT, CCT, UD, CD, BD, JQ]') -> None:
"""Run after the `NativeApplication` is stopped."""
from hammett.conf import settings
from hammett.stopwatch.collector import get_stopwatch_stats_processor

if settings.HANDLERS_STOPWATCH:
processor = get_stopwatch_stats_processor()
await processor.on_exit()

def provide_application_builder(self: 'Self') -> 'ApplicationBuilder': # type: ignore[type-arg]
"""Return a native application builder."""
from hammett.conf import settings

return NativeApplication.builder().read_timeout(
settings.APPLICATION_BUILDER_READ_TIMEOUT,
).post_stop(
self.post_stop,
).token(
settings.TOKEN,
)
Expand Down
9 changes: 9 additions & 0 deletions hammett/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,12 @@ class LatestMessage(TypedDict):
chat_id: int
message_id: int
hide_keyboard: bool


class HandlerStats(TypedDict):
"""The class represents information about the handler stats."""

cpu_time: float
select_time: float
other_io_time: float
real_time: float
10 changes: 10 additions & 0 deletions hammett/stopwatch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""The package contains functions to collect handler statistics and further represent them."""

__all__ = (
'StopWatchEventLoopPolicy',
'StopWatchSelector',
'collect_handler_statistics',
)

from hammett.stopwatch.collector import collect_handler_statistics
from hammett.stopwatch.event_loop import StopWatchEventLoopPolicy, StopWatchSelector
84 changes: 84 additions & 0 deletions hammett/stopwatch/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""The module contains a decorator that wraps handlers to track time."""

import asyncio
import functools
import logging
import time
from typing import TYPE_CHECKING

from hammett.conf import settings
from hammett.core.exceptions import ImproperlyConfigured
from hammett.stopwatch.event_loop import StopWatchSelector
from hammett.utils.handler import wraps_handler
from hammett.utils.module_loading import import_string

if TYPE_CHECKING:
from typing import Any

from hammett.core.constants import HandlerStats
from hammett.stopwatch.stats import BaseStatsProcessor
from hammett.types import Handler

LOGGER = logging.getLogger(__name__)


@functools.cache
def get_stopwatch_stats_processor() -> 'type[BaseStatsProcessor]':
"""Import a stopwatch stats processor by the `STOPWATCH_STATS_PROCESSOR` of the module path
and returns it.
"""
try:
processor = import_string(settings.STOPWATCH_STATS_PROCESSOR)
except ImportError as exc:
msg = (
f'The module could not be imported: {settings.STOPWATCH_STATS_PROCESSOR}. '
f'Check your STOPWATCH_STATS_PROCESSOR setting.'
)
raise ImproperlyConfigured(msg) from exc
else:
return processor


def collect_handler_statistics(func: 'Handler') -> 'Handler':
"""Collect the statistics of the handler and passes them to the `STOPWATCH_STATS`."""
stats_processor = get_stopwatch_stats_processor()

@wraps_handler(func)
async def wrapper(*args: 'Any', **kwargs: 'Any') -> 'Any':
try:
event_loop_selector = getattr(asyncio.get_event_loop(), '_selector') # noqa: B009
except AttributeError:
event_loop_selector = None

if event_loop_selector is None or not isinstance(event_loop_selector, StopWatchSelector):
is_possible_get_select_time = False
LOGGER.error(
'Unable to get select time statistic. Perhaps you have changed the '
'event loop policy?',
)
else:
is_possible_get_select_time = True
event_loop_selector.reset_select_time()

real_time = time.time()
process_time = time.process_time()

response = await func(*args, **kwargs)

real_time = time.time() - real_time
cpu_time = time.process_time() - process_time

select_time = event_loop_selector.select_time if is_possible_get_select_time else 0.0
other_io_time = max(0.0, real_time - cpu_time - select_time)

stats: HandlerStats = {
'cpu_time': cpu_time,
'select_time': select_time,
'other_io_time': other_io_time,
'real_time': real_time,
}
await stats_processor(func, stats).process()

return response

return wrapper
50 changes: 50 additions & 0 deletions hammett/stopwatch/event_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""The module contains the implementation of a modified `EventLoopPolicy` that can track the time
of select.
"""

import asyncio
import selectors
import time
from typing import TYPE_CHECKING, cast

if TYPE_CHECKING:
from asyncio.selector_events import BaseSelectorEventLoop
from selectors import SelectorKey
from typing import Any


class StopWatchSelector(selectors.DefaultSelector):
"""The class implement the tracking of the time of retrieving the task from the event loop."""

def __init__(self, *args: 'Any', **kwargs: 'Any') -> None:
"""Initialize `select_time` attribute."""
self.select_time = 0.0

super().__init__(*args, **kwargs)

def reset_select_time(self) -> None:
"""Reset select_time value."""
self.select_time = 0.0

def select(self, timeout: float | None = None) -> 'list[tuple[SelectorKey, int]]':
"""Track select time if timeout is greater than zero."""
if timeout is not None and timeout <= 0:
return super().select(timeout)

start = time.time()
try:
return super().select(timeout)
finally:
self.select_time += time.time() - start


class StopWatchEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
"""The class implements the `StopWatchSelector` passing in the event loop."""

def new_event_loop(self) -> 'BaseSelectorEventLoop':
"""Create a new event loop and pass `StopWatchSelector`."""
selector = StopWatchSelector()
return cast(
'BaseSelectorEventLoop',
self._loop_factory(selector=selector), # type: ignore[attr-defined]
)
66 changes: 66 additions & 0 deletions hammett/stopwatch/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""The module contains classes that implement the processing of handler statistics."""

import json
from pathlib import Path
from typing import TYPE_CHECKING

from hammett.core.handlers import get_handler_name

if TYPE_CHECKING:
from hammett.core.constants import HandlerStats
from hammett.types import Handler


class BaseStatsProcessor:
"""Base class for implementing handler statistics processing."""

def __init__(self, handler: 'Handler', stats: 'HandlerStats') -> None:
"""Save `handler` and `stats` values to the relevant attributes."""
self.handler = handler
self.stats = stats

@classmethod
async def on_exit(cls) -> None:
"""Run after application is stopped."""

async def process(self) -> None:
"""Run after handler is called and statistics are retrieved."""
raise NotImplementedError


class PrintStatsProcessor(BaseStatsProcessor):
"""The class implements the output of the handler stats via print."""

async def process(self) -> None:
"""Print handler stats."""
msg = (
f'{get_handler_name(self.handler)}:\n'
f' CPU time: {self.stats["cpu_time"]}\n'
f' Select time: {self.stats["select_time"]}\n'
f' Other IO time: {self.stats["other_io_time"]}\n'
f' Real time: {self.stats["real_time"]}\n'
)
print(msg) # noqa: T201


class JsonStatsProcessor(BaseStatsProcessor):
"""The class collects the stats of all handlers and dumps them into a json file."""

_all_stats: 'dict[str, list[HandlerStats]]' = {}

async def process(self) -> None:
"""Add handler stats to all stats dict."""
handler_name = get_handler_name(self.handler)
try:
self._all_stats[handler_name]
except KeyError:
self._all_stats[handler_name] = []

self._all_stats[handler_name].append(self.stats)

@classmethod
async def on_exit(cls) -> None:
"""Dump the accumulated statistics to a file."""
# we can't use async writing to a file because it won't execute in time
with Path('handler_stats.json').open('w', encoding='utf-8') as f: # noqa: ASYNC230
json.dump(cls._all_stats, f, ensure_ascii=False, indent=4)

0 comments on commit 3d73d73

Please sign in to comment.