Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: raise inner exceptions in with_real tests #751

Merged
merged 17 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/en/getting-started/subscription/test.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ If you want to test your application in a real environment, you shouldn't have t
When you're using a patched broker to test your consumers, the publish method is called synchronously with a consumer one, so you need not wait until your message is consumed. But in the real broker's case, it doesn't.

For this reason, you have to wait for message consumption manually with the special `#!python handler.wait_call(timeout)` method.
Also, inner handler exceptions will be raised in this function, not `#!python broker.publish(...)`.

### A Little Tip

Expand Down
35 changes: 2 additions & 33 deletions faststream/broker/core/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class BrokerUsecase(
subscriber : decorator to register a subscriber
publisher : register a publisher
_wrap_decode_message : wrap a message decoding function
_log_execution : log the execution of a function
!!! note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Expand Down Expand Up @@ -165,10 +164,10 @@ def __init__(
midd_args: Sequence[Callable[[MsgType], BaseMiddleware]] = (
middlewares or empty_middleware
)
self.middlewares = [CriticalLogMiddleware(logger), *midd_args]
self.middlewares = [CriticalLogMiddleware(logger, log_level), *midd_args]
self.dependencies = dependencies

self._connection_args = (url, *args)
self._connection_args = args
self._connection_kwargs = kwargs

self._global_parser = parser
Expand Down Expand Up @@ -255,7 +254,6 @@ def _wrap_handler(
extra_dependencies: Sequence[Depends] = (),
_raw: bool = False,
_get_dependant: Optional[Any] = None,
**broker_log_context_kwargs: Any,
) -> Tuple[
HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
Union[
Expand Down Expand Up @@ -329,9 +327,6 @@ def _wrap_handler(
watcher=get_watcher(self.logger, retry),
)

if self.logger is not None:
process_f = self._log_execution(process_f, **broker_log_context_kwargs)

process_f = set_message_context(process_f)

handler_call.set_wrapped(process_f)
Expand Down Expand Up @@ -523,32 +518,6 @@ def _wrap_decode_message(
"""
raise NotImplementedError()

@abstractmethod
def _log_execution(
self,
func: Callable[
[StreamMessage[MsgType]],
Awaitable[WrappedReturn[T_HandlerReturn]],
],
**broker_args: Any,
) -> Callable[[StreamMessage[MsgType]], Awaitable[WrappedReturn[T_HandlerReturn]],]:
"""Log the execution of a function.

Args:
func: The function to be logged.
**broker_args: Additional arguments to be passed to the function.

Returns:
A decorated version of the function that logs its execution.

Raises:
NotImplementedError: If the function is not implemented.
!!! note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
raise NotImplementedError()


def extend_dependencies(
extra: Sequence[CallModel[Any, Any]], dependant: CallModel[Any, Any]
Expand Down
75 changes: 3 additions & 72 deletions faststream/broker/core/asyncronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
cast,
)

import anyio
from fast_depends.core import CallModel
from fast_depends.dependencies import Depends

Expand All @@ -40,10 +39,9 @@
WrappedReturn,
)
from faststream.broker.wrapper import HandlerCallWrapper
from faststream.exceptions import AckMessage, NackMessage, RejectMessage, SkipMessage
from faststream.exceptions import AckMessage, NackMessage, RejectMessage
from faststream.log import access_logger
from faststream.types import SendableMessage
from faststream.utils import context
from faststream.utils.functions import to_async


Expand Down Expand Up @@ -92,8 +90,8 @@ class BrokerAsyncUsecase(BrokerUsecase[MsgType, ConnectionType]):
async def start(self) -> None:
super()._abc_start()
for h in self.handlers.values():
for c, _, _, _, _, _ in h.calls:
c.event = anyio.Event()
for f, _, _, _, _, _ in h.calls:
f.refresh(with_mock=False)
await self.connect()

@abstractmethod
Expand Down Expand Up @@ -431,7 +429,6 @@ def _wrap_handler(
extra_dependencies: Sequence[Depends] = (),
_raw: bool = False,
_get_dependant: Optional[Any] = None,
**broker_log_context_kwargs: Any,
) -> Tuple[
HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
CallModel[P_HandlerParams, T_HandlerReturn],
Expand All @@ -458,7 +455,6 @@ def _wrap_handler(
extra_dependencies=extra_dependencies,
_raw=_raw,
_get_dependant=_get_dependant,
**broker_log_context_kwargs,
)

async def _execute_handler(
Expand Down Expand Up @@ -494,68 +490,3 @@ async def _execute_handler(
except RejectMessage as e:
await message.reject()
raise e

@override
def _log_execution(
self,
func: Callable[
[StreamMessage[MsgType]],
Awaitable[WrappedReturn[T_HandlerReturn]],
],
**broker_args: Any,
) -> Callable[[StreamMessage[MsgType]], Awaitable[WrappedReturn[T_HandlerReturn]]]:
"""Decorator to log the execution of a function.

Args:
func: The function to be decorated.

Returns:
The decorated function.

Raises:
SkipMessage: If the message should be skipped.

Note:
This decorator logs the execution of the function, including the received message and any exceptions that occur during execution.
!!! note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""

@wraps(func)
async def log_wrapper(
message: StreamMessage[MsgType],
) -> WrappedReturn[T_HandlerReturn]:
"""A wrapper function for logging.

Args:
message : The message to be logged.

Returns:
The wrapped return value.

Raises:
SkipMessage: If the message is to be skipped.
Exception: If an exception occurs during processing.
!!! note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
log_context = self._get_log_context(message=message, **broker_args)

with context.scope("log_context", log_context):
self._log("Received", extra=log_context)

try:
r = await func(message)
except SkipMessage as e:
self._log("Skipped", extra=log_context)
raise e
except Exception as e:
self._log(f"{type(e).__name__}: {e}", logging.ERROR, exc_info=e)
raise e
else:
self._log("Processed", extra=log_context)
return r

return log_wrapper
25 changes: 23 additions & 2 deletions faststream/broker/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Any,
Awaitable,
Callable,
Dict,
Generic,
List,
Optional,
Expand Down Expand Up @@ -34,8 +35,9 @@
WrappedReturn,
)
from faststream.broker.wrapper import HandlerCallWrapper
from faststream.exceptions import StopConsume
from faststream.exceptions import HandlerException, StopConsume
from faststream.types import SendableMessage
from faststream.utils.context.main import context
from faststream.utils.functions import to_async


Expand Down Expand Up @@ -87,6 +89,7 @@ class BaseHandler(AsyncAPIOperation, Generic[MsgType]):
def __init__(
self,
*,
log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
description: Optional[str] = None,
title: Optional[str] = None,
):
Expand All @@ -104,6 +107,7 @@ def __init__(
# AsyncAPI information
self._description = description
self._title = title
self.log_context_builder = log_context_builder

@override
@property
Expand Down Expand Up @@ -245,6 +249,7 @@ async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[overri
result: Optional[WrappedReturn[SendableMessage]] = None
result_msg: SendableMessage = None

logged = False
async with AsyncExitStack() as stack:
gl_middlewares: List[BaseMiddleware] = []

Expand All @@ -263,6 +268,12 @@ async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[overri

# TODO: add parser & decoder cashes
message = await parser(msg)

if not logged:
log_context_tag = context.set_local(
"log_context", self.log_context_builder(message)
)

message.decoded_body = await decoder(message)
message.processed = processed

Expand Down Expand Up @@ -308,15 +319,25 @@ async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[overri

except StopConsume:
await self.close()
return None
handler.trigger()

except HandlerException as e:
handler.trigger()
raise e

except Exception as e:
handler.trigger(error=e)
raise e

else:
handler.trigger(result=result[0] if result else None)
message.processed = processed = True
if IS_OPTIMIZED: # pragma: no cover
break

assert processed, "You have to consume message" # nosec B101

context.reset_local("log_context", log_context_tag)
return result_msg

@abstractmethod
Expand Down
26 changes: 23 additions & 3 deletions faststream/broker/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from faststream._compat import Self
from faststream.types import DecodedMessage, SendableMessage
from faststream.utils.context.main import context


class BaseMiddleware:
Expand Down Expand Up @@ -245,7 +246,7 @@ class CriticalLogMiddleware(BaseMiddleware):
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""

def __init__(self, logger: Optional[logging.Logger]) -> None:
def __init__(self, logger: Optional[logging.Logger], log_level: int) -> None:
"""Initialize the class.

Args:
Expand All @@ -258,6 +259,8 @@ def __init__(self, logger: Optional[logging.Logger]) -> None:
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
self.logger = logger
self.log_level = log_level
self.is_log_enabled = bool(logger and logger.isEnabledFor(self.log_level))

def __call__(self, msg: Any) -> Self:
"""Call the object with a message.
Expand All @@ -273,6 +276,13 @@ def __call__(self, msg: Any) -> Self:
"""
return self

async def on_consume(self, msg: DecodedMessage) -> DecodedMessage:
if self.is_log_enabled and self.logger is not None:
c = context.get("log_context")
self.logger._log(self.log_level, msg="Received", args=(), extra=c)

return await super().on_consume(msg)

async def after_processed(
self,
exc_type: Optional[Type[BaseException]] = None,
Expand All @@ -292,6 +302,16 @@ async def after_processed(

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
if exc_type and exc_val and self.logger:
self.logger.critical(f"{exc_type.__name__}: {exc_val}", exc_info=exc_val)
if self.logger is not None:
c = context.get("log_context")

if exc_type and exc_val:
self.logger.error(
f"{exc_type.__name__}: {exc_val}",
exc_info=exc_val,
extra=c,
)

if self.is_log_enabled:
self.logger._log(self.log_level, msg="Processed", args=(), extra=c)
return True
4 changes: 2 additions & 2 deletions faststream/broker/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async def __aexit__(
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
self._event.set()
await self._task.__aexit__(exc_type, exc_val, exec_tb)
await self._task.__aexit__(None, None, None)


def patch_broker_calls(broker: BrokerUsecase[Any, Any]) -> None:
Expand All @@ -105,7 +105,7 @@ def patch_broker_calls(broker: BrokerUsecase[Any, Any]) -> None:

for handler in broker.handlers.values():
for f, _, _, _, _, _ in handler.calls:
f.event = anyio.Event()
f.refresh(with_mock=False)


async def call_handler(
Expand Down
Loading