-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
5055357
commit 2741ed5
Showing
7 changed files
with
247 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
"""The package contains functions to collect handler statistics and further represent them.""" | ||
|
||
__all__ = ( | ||
'StopWatchEventLoopPolicy', | ||
'StopWatchSelector', | ||
'collect_handler_statistics', | ||
) | ||
|
||
from hammett.stopwatch.collector import collect_handler_statistics | ||
from hammett.stopwatch.event_loop import StopWatchEventLoopPolicy, StopWatchSelector |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
"""The module contains a decorator that wraps handlers to track time.""" | ||
|
||
import asyncio | ||
import functools | ||
import logging | ||
import time | ||
from typing import TYPE_CHECKING | ||
|
||
from hammett.conf import settings | ||
from hammett.core.exceptions import ImproperlyConfigured | ||
from hammett.stopwatch.event_loop import StopWatchSelector | ||
from hammett.utils.handler import wraps_handler | ||
from hammett.utils.module_loading import import_string | ||
|
||
if TYPE_CHECKING: | ||
from typing import Any | ||
|
||
from hammett.core.constants import HandlerStats | ||
from hammett.stopwatch.stats import BaseStatsProcessor | ||
from hammett.types import Handler | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
@functools.cache | ||
def get_stopwatch_stats_processor() -> 'type[BaseStatsProcessor]': | ||
"""Import a stopwatch stats processor by the `STOPWATCH_STATS_PROCESSOR` of the module path | ||
and returns it. | ||
""" | ||
try: | ||
processor = import_string(settings.STOPWATCH_STATS_PROCESSOR) | ||
except ImportError as exc: | ||
msg = ( | ||
f'The module could not be imported: {settings.STOPWATCH_STATS_PROCESSOR}. ' | ||
f'Check your STOPWATCH_STATS_PROCESSOR setting.' | ||
) | ||
raise ImproperlyConfigured(msg) from exc | ||
else: | ||
return processor | ||
|
||
|
||
def collect_handler_statistics(func: 'Handler') -> 'Handler': | ||
"""Collect the statistics of the handler and passes them to the `STOPWATCH_STATS`.""" | ||
stats_processor = get_stopwatch_stats_processor() | ||
|
||
@wraps_handler(func) | ||
async def wrapper(*args: 'Any', **kwargs: 'Any') -> 'Any': | ||
try: | ||
event_loop_selector = getattr(asyncio.get_event_loop(), '_selector') # noqa: B009 | ||
except AttributeError: | ||
event_loop_selector = None | ||
|
||
if event_loop_selector is None or not isinstance(event_loop_selector, StopWatchSelector): | ||
is_possible_get_select_time = False | ||
LOGGER.error( | ||
'Unable to get select time statistic. Perhaps you have changed the ' | ||
'event loop policy?', | ||
) | ||
else: | ||
is_possible_get_select_time = True | ||
event_loop_selector.reset_select_time() | ||
|
||
real_time = time.time() | ||
process_time = time.process_time() | ||
|
||
response = await func(*args, **kwargs) | ||
|
||
real_time = time.time() - real_time | ||
cpu_time = time.process_time() - process_time | ||
|
||
select_time = event_loop_selector.select_time if is_possible_get_select_time else 0.0 | ||
other_io_time = max(0.0, real_time - cpu_time - select_time) | ||
|
||
stats: HandlerStats = { | ||
'cpu_time': cpu_time, | ||
'select_time': select_time, | ||
'other_io_time': other_io_time, | ||
'real_time': real_time, | ||
} | ||
await stats_processor(func, stats).process() | ||
|
||
return response | ||
|
||
return wrapper |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
"""The module contains the implementation of a modified `EventLoopPolicy` that can track the time | ||
of select. | ||
""" | ||
|
||
import asyncio | ||
import selectors | ||
import time | ||
from typing import TYPE_CHECKING, cast | ||
|
||
if TYPE_CHECKING: | ||
from asyncio.selector_events import BaseSelectorEventLoop | ||
from selectors import SelectorKey | ||
from typing import Any | ||
|
||
|
||
class StopWatchSelector(selectors.DefaultSelector): | ||
"""The class implement the tracking of the time of retrieving the task from the event loop.""" | ||
|
||
def __init__(self, *args: 'Any', **kwargs: 'Any') -> None: | ||
"""Initialize `select_time` attribute.""" | ||
self.select_time = 0.0 | ||
|
||
super().__init__(*args, **kwargs) | ||
|
||
def reset_select_time(self) -> None: | ||
"""Reset select_time value.""" | ||
self.select_time = 0.0 | ||
|
||
def select(self, timeout: float | None = None) -> 'list[tuple[SelectorKey, int]]': | ||
"""Track select time if timeout is greater than zero.""" | ||
if timeout is not None and timeout <= 0: | ||
return super().select(timeout) | ||
|
||
start = time.time() | ||
try: | ||
return super().select(timeout) | ||
finally: | ||
self.select_time += time.time() - start | ||
|
||
|
||
class StopWatchEventLoopPolicy(asyncio.DefaultEventLoopPolicy): | ||
"""The class implements the `StopWatchSelector` passing in the event loop.""" | ||
|
||
def new_event_loop(self) -> 'BaseSelectorEventLoop': | ||
"""Create a new event loop and pass `StopWatchSelector`.""" | ||
selector = StopWatchSelector() | ||
return cast( | ||
'BaseSelectorEventLoop', | ||
self._loop_factory(selector=selector), # type: ignore[attr-defined] | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
"""The module contains classes that implement the processing of handler statistics.""" | ||
|
||
import json | ||
from pathlib import Path | ||
from typing import TYPE_CHECKING | ||
|
||
from hammett.core.handlers import get_handler_name | ||
|
||
if TYPE_CHECKING: | ||
from hammett.core.constants import HandlerStats | ||
from hammett.types import Handler | ||
|
||
|
||
class BaseStatsProcessor: | ||
"""Base class for implementing handler statistics processing.""" | ||
|
||
def __init__(self, handler: 'Handler', stats: 'HandlerStats') -> None: | ||
"""Save `handler` and `stats` values to the relevant attributes.""" | ||
self.handler = handler | ||
self.stats = stats | ||
|
||
@classmethod | ||
async def on_exit(cls) -> None: | ||
"""Run after application is stopped.""" | ||
|
||
async def process(self) -> None: | ||
"""Run after handler is called and statistics are retrieved.""" | ||
raise NotImplementedError | ||
|
||
|
||
class PrintStatsProcessor(BaseStatsProcessor): | ||
"""The class implements the output of the handler stats via print.""" | ||
|
||
async def process(self) -> None: | ||
"""Print handler stats.""" | ||
msg = ( | ||
f'{get_handler_name(self.handler)}:\n' | ||
f' CPU time: {self.stats["cpu_time"]}\n' | ||
f' Select time: {self.stats["select_time"]}\n' | ||
f' Other IO time: {self.stats["other_io_time"]}\n' | ||
f' Real time: {self.stats["real_time"]}\n' | ||
) | ||
print(msg) # noqa: T201 | ||
|
||
|
||
class JsonStatsProcessor(BaseStatsProcessor): | ||
"""The class collects the stats of all handlers and dumps them into a json file.""" | ||
|
||
_all_stats: 'dict[str, list[HandlerStats]]' = {} | ||
|
||
async def process(self) -> None: | ||
"""Add handler stats to all stats dict.""" | ||
handler_name = get_handler_name(self.handler) | ||
try: | ||
self._all_stats[handler_name] | ||
except KeyError: | ||
self._all_stats[handler_name] = [] | ||
|
||
self._all_stats[handler_name].append(self.stats) | ||
|
||
@classmethod | ||
async def on_exit(cls) -> None: | ||
"""Dump the accumulated statistics to a file.""" | ||
# we can't use async writing to a file because it won't execute in time | ||
with Path('handler_stats.json').open('w', encoding='utf-8') as f: # noqa: ASYNC230 | ||
json.dump(cls._all_stats, f, ensure_ascii=False, indent=4) |