-
-
Notifications
You must be signed in to change notification settings - Fork 31.5k
/
logging.py
197 lines (151 loc) · 6.05 KB
/
logging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""Logging utilities."""
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
from functools import partial, wraps
import inspect
import logging
import logging.handlers
import queue
import traceback
from typing import Any, TypeVar, cast, overload
from homeassistant.core import HomeAssistant, callback, is_callback
_T = TypeVar("_T")
class HomeAssistantQueueHandler(logging.handlers.QueueHandler):
"""Process the log in another thread."""
listener: logging.handlers.QueueListener | None = None
def prepare(self, record: logging.LogRecord) -> logging.LogRecord:
"""Prepare a record for queuing.
This is added as a workaround for https://bugs.python.org/issue46755
"""
record = super().prepare(record)
record.stack_info = None
return record
def handle(self, record: logging.LogRecord) -> Any:
"""Conditionally emit the specified logging record.
Depending on which filters have been added to the handler, push the new
records onto the backing Queue.
The default python logger Handler acquires a lock
in the parent class which we do not need as
SimpleQueue is already thread safe.
See https://bugs.python.org/issue24645
"""
return_value = self.filter(record)
if return_value:
self.emit(record)
return return_value
def close(self) -> None:
"""Tidy up any resources used by the handler.
This adds shutdown of the QueueListener
"""
super().close()
if not self.listener:
return
self.listener.stop()
self.listener = None
@callback
def async_activate_log_queue_handler(hass: HomeAssistant) -> None:
"""Migrate the existing log handlers to use the queue.
This allows us to avoid blocking I/O and formatting messages
in the event loop as log messages are written in another thread.
"""
simple_queue: queue.SimpleQueue[logging.Handler] = queue.SimpleQueue()
queue_handler = HomeAssistantQueueHandler(simple_queue)
logging.root.addHandler(queue_handler)
migrated_handlers: list[logging.Handler] = []
for handler in logging.root.handlers[:]:
if handler is queue_handler:
continue
logging.root.removeHandler(handler)
migrated_handlers.append(handler)
listener = logging.handlers.QueueListener(simple_queue, *migrated_handlers)
queue_handler.listener = listener
listener.start()
def log_exception(format_err: Callable[..., Any], *args: Any) -> None:
"""Log an exception with additional context."""
module = inspect.getmodule(inspect.stack(context=0)[1].frame)
if module is not None:
module_name = module.__name__
else:
# If Python is unable to access the sources files, the call stack frame
# will be missing information, so let's guard.
# https://github.com/home-assistant/core/issues/24982
module_name = __name__
# Do not print the wrapper in the traceback
frames = len(inspect.trace()) - 1
exc_msg = traceback.format_exc(-frames)
friendly_msg = format_err(*args)
logging.getLogger(module_name).error("%s\n%s", friendly_msg, exc_msg)
@overload
def catch_log_exception(
func: Callable[..., Coroutine[Any, Any, Any]], format_err: Callable[..., Any]
) -> Callable[..., Coroutine[Any, Any, None]]:
...
@overload
def catch_log_exception(
func: Callable[..., Any], format_err: Callable[..., Any]
) -> Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]:
...
def catch_log_exception(
func: Callable[..., Any], format_err: Callable[..., Any]
) -> Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]:
"""Decorate a function func to catch and log exceptions.
If func is a coroutine function, a coroutine function will be returned.
If func is a callback, a callback will be returned.
"""
# Check for partials to properly determine if coroutine function
check_func = func
while isinstance(check_func, partial):
check_func = check_func.func
wrapper_func: Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]
if asyncio.iscoroutinefunction(check_func):
async_func = cast(Callable[..., Coroutine[Any, Any, None]], func)
@wraps(async_func)
async def async_wrapper(*args: Any) -> None:
"""Catch and log exception."""
try:
await async_func(*args)
except Exception: # pylint: disable=broad-except
log_exception(format_err, *args)
wrapper_func = async_wrapper
else:
@wraps(func)
def wrapper(*args: Any) -> None:
"""Catch and log exception."""
try:
func(*args)
except Exception: # pylint: disable=broad-except
log_exception(format_err, *args)
if is_callback(check_func):
wrapper = callback(wrapper)
wrapper_func = wrapper
return wrapper_func
def catch_log_coro_exception(
target: Coroutine[Any, Any, _T], format_err: Callable[..., Any], *args: Any
) -> Coroutine[Any, Any, _T | None]:
"""Decorate a coroutine to catch and log exceptions."""
async def coro_wrapper(*args: Any) -> _T | None:
"""Catch and log exception."""
try:
return await target
except Exception: # pylint: disable=broad-except
log_exception(format_err, *args)
return None
return coro_wrapper(*args)
def async_create_catching_coro(
target: Coroutine[Any, Any, _T]
) -> Coroutine[Any, Any, _T | None]:
"""Wrap a coroutine to catch and log exceptions.
The exception will be logged together with a stacktrace of where the
coroutine was wrapped.
target: target coroutine.
"""
trace = traceback.extract_stack()
wrapped_target = catch_log_coro_exception(
target,
lambda: "Exception in {} called from\n {}".format(
target.__name__,
"".join(traceback.format_list(trace[:-1])),
),
)
return wrapped_target