From c2bf331f5ab651fb9fdff3253003a54538305461 Mon Sep 17 00:00:00 2001 From: Dos Moonen Date: Fri, 13 Sep 2024 18:44:19 +0200 Subject: [PATCH] Fix two bugs by adding more type hints to `CallbackCollection`. The first bug is `Channel` passing `Optional[BaseException]` to `self.close()` while `RobustChannel` passed `asyncio.Future` The second is registering a `CallbackCollection` instance as a callback for a different `CallbackCollection`. (Which was not supported before) --- aio_pika/abc.py | 30 ++++++--- aio_pika/channel.py | 3 +- aio_pika/message.py | 8 +-- aio_pika/patterns/master.py | 6 +- aio_pika/patterns/rpc.py | 9 ++- aio_pika/queue.py | 8 +-- aio_pika/robust_channel.py | 2 +- aio_pika/robust_connection.py | 6 +- aio_pika/tools.py | 114 +++++++++++++++++++++++++++----- docs/source/examples/pooling.py | 2 +- poetry.lock | 69 +++++++++---------- pyproject.toml | 2 +- tests/test_amqp.py | 5 +- tests/test_amqp_robust.py | 2 +- tests/test_amqp_robust_proxy.py | 6 +- 15 files changed, 183 insertions(+), 89 deletions(-) diff --git a/aio_pika/abc.py b/aio_pika/abc.py index 95873337..36ca1ee5 100644 --- a/aio_pika/abc.py +++ b/aio_pika/abc.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import dataclasses from abc import ABC, abstractmethod @@ -248,7 +250,10 @@ class AbstractQueue: arguments: Arguments passive: bool declaration_result: aiormq.spec.Queue.DeclareOk - close_callbacks: CallbackCollection + close_callbacks: CallbackCollection[ + AbstractQueue, + [Optional[BaseException]], + ] @abstractmethod def __init__( @@ -504,8 +509,14 @@ class AbstractChannel(PoolInstance, ABC): QUEUE_CLASS: Type[AbstractQueue] EXCHANGE_CLASS: Type[AbstractExchange] - close_callbacks: CallbackCollection - return_callbacks: CallbackCollection + close_callbacks: CallbackCollection[ + AbstractChannel, + [Optional[BaseException]], + ] + return_callbacks: CallbackCollection[ + AbstractChannel, + [AbstractIncomingMessage], + ] default_exchange: AbstractExchange publisher_confirms: bool @@ -715,7 +726,10 @@ def parse(self, value: Optional[str]) -> Any: class AbstractConnection(PoolInstance, ABC): PARAMETERS: Tuple[ConnectionParameter, ...] - close_callbacks: CallbackCollection + close_callbacks: CallbackCollection[ + AbstractConnection, + [Optional[BaseException]], + ] connected: asyncio.Event transport: Optional[UnderlayConnection] kwargs: Mapping[str, Any] @@ -832,7 +846,7 @@ async def bind( class AbstractRobustChannel(AbstractChannel): - reopen_callbacks: CallbackCollection + reopen_callbacks: CallbackCollection[AbstractRobustChannel, []] @abstractmethod def reopen(self) -> Awaitable[None]: @@ -875,7 +889,7 @@ async def declare_queue( class AbstractRobustConnection(AbstractConnection): - reconnect_callbacks: CallbackCollection + reconnect_callbacks: CallbackCollection[AbstractRobustConnection, []] @property @abstractmethod @@ -897,10 +911,10 @@ def channel( ChannelCloseCallback = Callable[ - [AbstractChannel, Optional[BaseException]], Any, + [Optional[AbstractChannel], Optional[BaseException]], Any, ] ConnectionCloseCallback = Callable[ - [AbstractConnection, Optional[BaseException]], Any, + [Optional[AbstractConnection], Optional[BaseException]], Any, ] ConnectionType = TypeVar("ConnectionType", bound=AbstractConnection) diff --git a/aio_pika/channel.py b/aio_pika/channel.py index 3f9d44ae..b0e73c67 100644 --- a/aio_pika/channel.py +++ b/aio_pika/channel.py @@ -226,7 +226,8 @@ async def _on_close( async def _set_closed_callback( self, - _: AbstractChannel, exc: BaseException + _: Optional[AbstractChannel], + exc: Optional[BaseException], ) -> None: if not self._closed.done(): self._closed.set_result(True) diff --git a/aio_pika/message.py b/aio_pika/message.py index 2066916f..7d92c5a9 100644 --- a/aio_pika/message.py +++ b/aio_pika/message.py @@ -47,7 +47,7 @@ def encode_expiration_timedelta(value: timedelta) -> str: return str(int(value.total_seconds() * MILLISECONDS)) -@encode_expiration.register(NoneType) # type: ignore +@encode_expiration.register(NoneType) def encode_expiration_none(_: Any) -> None: return None @@ -62,7 +62,7 @@ def decode_expiration_str(t: str) -> float: return float(t) / MILLISECONDS -@decode_expiration.register(NoneType) # type: ignore +@decode_expiration.register(NoneType) def decode_expiration_none(_: Any) -> None: return None @@ -88,7 +88,7 @@ def encode_timestamp_timedelta(value: timedelta) -> datetime: return datetime.now(tz=timezone.utc) + value -@encode_timestamp.register(NoneType) # type: ignore +@encode_timestamp.register(NoneType) def encode_timestamp_none(_: Any) -> None: return None @@ -103,7 +103,7 @@ def decode_timestamp_datetime(value: datetime) -> datetime: return value -@decode_timestamp.register(NoneType) # type: ignore +@decode_timestamp.register(NoneType) def decode_timestamp_none(_: Any) -> None: return None diff --git a/aio_pika/patterns/master.py b/aio_pika/patterns/master.py index e46e4ee5..a7a70e14 100644 --- a/aio_pika/patterns/master.py +++ b/aio_pika/patterns/master.py @@ -12,7 +12,7 @@ AbstractChannel, AbstractExchange, AbstractIncomingMessage, AbstractQueue, ConsumerTag, DeliveryMode, ) -from aio_pika.message import Message, ReturnedMessage +from aio_pika.message import Message from ..tools import create_task, ensure_awaitable from .base import Base, CallbackType, Proxy, T @@ -113,8 +113,8 @@ def exchange(self) -> AbstractExchange: @staticmethod def on_message_returned( - channel: AbstractChannel, - message: ReturnedMessage, + channel: Optional[AbstractChannel], + message: AbstractIncomingMessage, ) -> None: log.warning( "Message returned. Probably destination queue does not exists: %r", diff --git a/aio_pika/patterns/rpc.py b/aio_pika/patterns/rpc.py index d9c8b07d..91a336f1 100644 --- a/aio_pika/patterns/rpc.py +++ b/aio_pika/patterns/rpc.py @@ -15,7 +15,7 @@ ) from aio_pika.exceptions import MessageProcessError from aio_pika.exchange import ExchangeType -from aio_pika.message import IncomingMessage, Message, ReturnedMessage +from aio_pika.message import IncomingMessage, Message from ..tools import ensure_awaitable from .base import Base, CallbackType, Proxy, T @@ -193,7 +193,8 @@ async def initialize( self.channel.return_callbacks.add(self.on_message_returned) def on_close( - self, channel: AbstractChannel, + self, + channel: Optional[AbstractChannel], exc: Optional[ExceptionType] = None, ) -> None: log.debug("Closing RPC futures because %r", exc) @@ -218,7 +219,9 @@ async def create(cls, channel: AbstractChannel, **kwargs: Any) -> "RPC": return rpc def on_message_returned( - self, channel: AbstractChannel, message: ReturnedMessage, + self, + channel: Optional[AbstractChannel], + message: AbstractIncomingMessage, ) -> None: if message.correlation_id is None: log.warning( diff --git a/aio_pika/queue.py b/aio_pika/queue.py index fca449c9..9b4cd3de 100644 --- a/aio_pika/queue.py +++ b/aio_pika/queue.py @@ -418,13 +418,13 @@ def consumer_tag(self) -> Optional[ConsumerTag]: return getattr(self, "_consumer_tag", None) async def close(self) -> None: - await self._on_close(self._amqp_queue.channel, None) + await self._on_close(self._amqp_queue, None) if not self._closed.done(): self._closed.set_result(True) async def _set_closed( self, - _channel: AbstractChannel, + _channel: Optional[AbstractQueue], exc: Optional[BaseException] ) -> None: if not self._closed.done(): @@ -432,7 +432,7 @@ async def _set_closed( async def _on_close( self, - _channel: AbstractChannel, + _channel: Optional[AbstractQueue], _exc: Optional[BaseException] ) -> None: log.debug("Cancelling queue iterator %r", self) @@ -503,7 +503,7 @@ def __repr__(self) -> str: def __init__(self, queue: Queue, **kwargs: Any): self._consumer_tag: ConsumerTag - self._amqp_queue: AbstractQueue = queue + self._amqp_queue: Queue = queue self._queue = asyncio.Queue() self._closed = asyncio.get_running_loop().create_future() self._message_or_closed = asyncio.Event() diff --git a/aio_pika/robust_channel.py b/aio_pika/robust_channel.py index 0ab22edf..77114696 100644 --- a/aio_pika/robust_channel.py +++ b/aio_pika/robust_channel.py @@ -66,7 +66,7 @@ def __init__( self._prefetch_count: int = 0 self._prefetch_size: int = 0 self._global_qos: bool = False - self.reopen_callbacks: CallbackCollection = CallbackCollection(self) + self.reopen_callbacks = CallbackCollection(self) self.__restore_lock = asyncio.Lock() self.__restored = asyncio.Event() diff --git a/aio_pika/robust_connection.py b/aio_pika/robust_connection.py index 091f7bc4..1f1e9366 100644 --- a/aio_pika/robust_connection.py +++ b/aio_pika/robust_connection.py @@ -60,7 +60,7 @@ def __init__( self.__reconnection_task: Optional[asyncio.Task] = None self._reconnect_lock = asyncio.Lock() - self.reconnect_callbacks: CallbackCollection = CallbackCollection(self) + self.reconnect_callbacks = CallbackCollection(self) self.__connection_close_event.set() @@ -104,9 +104,7 @@ async def _on_connected(self) -> None: log.exception("Failed to reopen channel") raise except Exception as e: - closing = self.loop.create_future() - closing.set_exception(e) - await self.close_callbacks(closing) + await self.close_callbacks(e) await asyncio.gather( transport.connection.close(e), return_exceptions=True, diff --git a/aio_pika/tools.py b/aio_pika/tools.py index 890a794b..c60c3ce2 100644 --- a/aio_pika/tools.py +++ b/aio_pika/tools.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import inspect import warnings @@ -5,9 +7,12 @@ from itertools import chain from threading import Lock from typing import ( - AbstractSet, Any, Awaitable, Callable, Coroutine, Generator, Iterator, List, - MutableSet, Optional, TypeVar, Union, + AbstractSet, Any, Awaitable, Callable, Coroutine, Generator, Iterator, + List, + MutableSet, Optional, TypeVar, Union, Generic, ) + +from typing_extensions import ParamSpec, Protocol from weakref import ReferenceType, WeakSet, ref from aio_pika.log import get_logger @@ -66,8 +71,20 @@ def run(future: asyncio.Future) -> Optional[asyncio.Future]: return future -CallbackType = Callable[..., Union[T, Awaitable[T]]] -CallbackSetType = AbstractSet[CallbackType] +_Sender = TypeVar("_Sender", contravariant=True) +_Params = ParamSpec("_Params") +_Return = TypeVar("_Return", covariant=True) + + +class CallbackType(Protocol[_Sender, _Params, _Return]): + def __call__( + self, + __sender: Optional[_Sender], + /, + *args: _Params.args, + **kwargs: _Params.kwargs, + ) -> Union[_Return, Awaitable[_Return]]: + ... class StubAwaitable: @@ -80,7 +97,15 @@ def __await__(self) -> Generator[Any, Any, None]: STUB_AWAITABLE = StubAwaitable() -class CallbackCollection(MutableSet): +class CallbackCollection( + MutableSet[ + Union[ + CallbackType[_Sender, _Params, Any], + "CallbackCollection[Any, _Params]", + ], + ], + Generic[_Sender, _Params], +): __slots__ = ( "__weakref__", "__sender", @@ -89,7 +114,7 @@ class CallbackCollection(MutableSet): "__lock", ) - def __init__(self, sender: Union[T, ReferenceType]): + def __init__(self, sender: Union[_Sender, ReferenceType[_Sender]]): self.__sender: ReferenceType if isinstance(sender, ReferenceType): self.__sender = sender @@ -97,10 +122,22 @@ def __init__(self, sender: Union[T, ReferenceType]): self.__sender = ref(sender) self.__callbacks: CallbackSetType = set() - self.__weak_callbacks: MutableSet[CallbackType] = WeakSet() + self.__weak_callbacks: MutableSet[ + Union[ + CallbackType[_Sender, _Params, Any], + CallbackCollection[Any, _Params], + ], + ] = WeakSet() self.__lock: Lock = Lock() - def add(self, callback: CallbackType, weak: bool = False) -> None: + def add( + self, + callback: Union[ + CallbackType[_Sender, _Params, Any], + CallbackCollection[Any, _Params], + ], + weak: bool = False + ) -> None: if self.is_frozen: raise RuntimeError("Collection frozen") if not callable(callback): @@ -112,7 +149,29 @@ def add(self, callback: CallbackType, weak: bool = False) -> None: else: self.__callbacks.add(callback) # type: ignore - def discard(self, callback: CallbackType) -> None: + def remove( + self, + callback: Union[ + CallbackType[_Sender, _Params, Any], + CallbackCollection[Any, _Params], + ], + ) -> None: + if self.is_frozen: + raise RuntimeError("Collection frozen") + + with self.__lock: + try: + self.__callbacks.remove(callback) # type: ignore + except KeyError: + self.__weak_callbacks.remove(callback) + + def discard( + self, + callback: Union[ + CallbackType[_Sender, _Params, Any], + CallbackCollection[Any, _Params], + ], + ) -> None: if self.is_frozen: raise RuntimeError("Collection frozen") @@ -156,13 +215,18 @@ def __contains__(self, x: object) -> bool: def __len__(self) -> int: return len(self.__callbacks) + len(self.__weak_callbacks) - def __iter__(self) -> Iterator[CallbackType]: + def __iter__(self) -> Iterator[ + Union[ + CallbackType[_Sender, _Params, Any], + CallbackCollection[_Sender, _Params], + ], + ]: return iter(chain(self.__callbacks, self.__weak_callbacks)) def __bool__(self) -> bool: return bool(self.__callbacks) or bool(self.__weak_callbacks) - def __copy__(self) -> "CallbackCollection": + def __copy__(self) -> CallbackCollection[_Sender, _Params]: instance = self.__class__(self.__sender) with self.__lock: @@ -177,7 +241,11 @@ def __copy__(self) -> "CallbackCollection": return instance - def __call__(self, *args: Any, **kwargs: Any) -> Awaitable[Any]: + def __call__( + self, + *args: _Params.args, + **kwargs: _Params.kwargs, + ) -> Awaitable[Any]: futures: List[asyncio.Future] = [] with self.__lock: @@ -185,14 +253,18 @@ def __call__(self, *args: Any, **kwargs: Any) -> Awaitable[Any]: for cb in self: try: - result = cb(sender, *args, **kwargs) - if hasattr(result, "__await__"): + if isinstance(cb, CallbackCollection): + result = cb(*args, **kwargs) + else: + result = cb(sender, *args, **kwargs) + if inspect.isawaitable(result): futures.append(asyncio.ensure_future(result)) except Exception: log.exception("Callback %r error", cb) if not futures: return STUB_AWAITABLE + return asyncio.gather(*futures, return_exceptions=True) def __hash__(self) -> int: @@ -242,8 +314,8 @@ def __call__(self, *args: Any, **kwargs: Any) -> Awaitable[Any]: def ensure_awaitable( - func: Callable[..., Union[T, Awaitable[T]]], -) -> Callable[..., Awaitable[T]]: + func: Callable[_Params, Union[T, Awaitable[T]]], +) -> Callable[_Params, Awaitable[T]]: if inspect.iscoroutinefunction(func): return func @@ -256,7 +328,7 @@ def ensure_awaitable( ) @wraps(func) - async def wrapper(*args: Any, **kwargs: Any) -> T: + async def wrapper(*args: _Params.args, **kwargs: _Params.kwargs) -> T: nonlocal func result = func(*args, **kwargs) @@ -277,6 +349,14 @@ async def wrapper(*args: Any, **kwargs: Any) -> T: return wrapper +CallbackSetType = AbstractSet[ + Union[ + CallbackType[_Sender, _Params, None], + CallbackCollection[_Sender, _Params], + ], +] + + __all__ = ( "CallbackCollection", "CallbackSetType", diff --git a/docs/source/examples/pooling.py b/docs/source/examples/pooling.py index af8d5c3c..d0e18b08 100644 --- a/docs/source/examples/pooling.py +++ b/docs/source/examples/pooling.py @@ -40,7 +40,7 @@ async def publish() -> None: async with connection_pool, channel_pool: task = asyncio.create_task(consume()) - await asyncio.wait([publish() for _ in range(50)]) + await asyncio.wait([asyncio.create_task(publish()) for _ in range(50)]) await task diff --git a/poetry.lock b/poetry.lock index cfc6a78d..f94c176a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -709,52 +709,49 @@ files = [ [[package]] name = "mypy" -version = "0.991" +version = "1.11.2" description = "Optional static typing for Python" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "mypy-0.991-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:7d17e0a9707d0772f4a7b878f04b4fd11f6f5bcb9b3813975a9b13c9332153ab"}, - {file = "mypy-0.991-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:0714258640194d75677e86c786e80ccf294972cc76885d3ebbb560f11db0003d"}, - {file = "mypy-0.991-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:0c8f3be99e8a8bd403caa8c03be619544bc2c77a7093685dcf308c6b109426c6"}, - {file = "mypy-0.991-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc9ec663ed6c8f15f4ae9d3c04c989b744436c16d26580eaa760ae9dd5d662eb"}, - {file = "mypy-0.991-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:4307270436fd7694b41f913eb09210faff27ea4979ecbcd849e57d2da2f65305"}, - {file = "mypy-0.991-cp310-cp310-win_amd64.whl", hash = "sha256:901c2c269c616e6cb0998b33d4adbb4a6af0ac4ce5cd078afd7bc95830e62c1c"}, - {file = "mypy-0.991-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:d13674f3fb73805ba0c45eb6c0c3053d218aa1f7abead6e446d474529aafc372"}, - {file = "mypy-0.991-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:1c8cd4fb70e8584ca1ed5805cbc7c017a3d1a29fb450621089ffed3e99d1857f"}, - {file = "mypy-0.991-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:209ee89fbb0deed518605edddd234af80506aec932ad28d73c08f1400ef80a33"}, - {file = "mypy-0.991-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:37bd02ebf9d10e05b00d71302d2c2e6ca333e6c2a8584a98c00e038db8121f05"}, - {file = "mypy-0.991-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:26efb2fcc6b67e4d5a55561f39176821d2adf88f2745ddc72751b7890f3194ad"}, - {file = "mypy-0.991-cp311-cp311-win_amd64.whl", hash = "sha256:3a700330b567114b673cf8ee7388e949f843b356a73b5ab22dd7cff4742a5297"}, - {file = "mypy-0.991-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:1f7d1a520373e2272b10796c3ff721ea1a0712288cafaa95931e66aa15798813"}, - {file = "mypy-0.991-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:641411733b127c3e0dab94c45af15fea99e4468f99ac88b39efb1ad677da5711"}, - {file = "mypy-0.991-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:3d80e36b7d7a9259b740be6d8d906221789b0d836201af4234093cae89ced0cd"}, - {file = "mypy-0.991-cp37-cp37m-win_amd64.whl", hash = "sha256:e62ebaad93be3ad1a828a11e90f0e76f15449371ffeecca4a0a0b9adc99abcef"}, - {file = "mypy-0.991-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:b86ce2c1866a748c0f6faca5232059f881cda6dda2a893b9a8373353cfe3715a"}, - {file = "mypy-0.991-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ac6e503823143464538efda0e8e356d871557ef60ccd38f8824a4257acc18d93"}, - {file = "mypy-0.991-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:0cca5adf694af539aeaa6ac633a7afe9bbd760df9d31be55ab780b77ab5ae8bf"}, - {file = "mypy-0.991-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a12c56bf73cdab116df96e4ff39610b92a348cc99a1307e1da3c3768bbb5b135"}, - {file = "mypy-0.991-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:652b651d42f155033a1967739788c436491b577b6a44e4c39fb340d0ee7f0d70"}, - {file = "mypy-0.991-cp38-cp38-win_amd64.whl", hash = "sha256:4175593dc25d9da12f7de8de873a33f9b2b8bdb4e827a7cae952e5b1a342e243"}, - {file = "mypy-0.991-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:98e781cd35c0acf33eb0295e8b9c55cdbef64fcb35f6d3aa2186f289bed6e80d"}, - {file = "mypy-0.991-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:6d7464bac72a85cb3491c7e92b5b62f3dcccb8af26826257760a552a5e244aa5"}, - {file = "mypy-0.991-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c9166b3f81a10cdf9b49f2d594b21b31adadb3d5e9db9b834866c3258b695be3"}, - {file = "mypy-0.991-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b8472f736a5bfb159a5e36740847808f6f5b659960115ff29c7cecec1741c648"}, - {file = "mypy-0.991-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5e80e758243b97b618cdf22004beb09e8a2de1af481382e4d84bc52152d1c476"}, - {file = "mypy-0.991-cp39-cp39-win_amd64.whl", hash = "sha256:74e259b5c19f70d35fcc1ad3d56499065c601dfe94ff67ae48b85596b9ec1461"}, - {file = "mypy-0.991-py3-none-any.whl", hash = "sha256:de32edc9b0a7e67c2775e574cb061a537660e51210fbf6006b0b36ea695ae9bb"}, - {file = "mypy-0.991.tar.gz", hash = "sha256:3c0165ba8f354a6d9881809ef29f1a9318a236a6d81c690094c5df32107bde06"}, + {file = "mypy-1.11.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d42a6dd818ffce7be66cce644f1dff482f1d97c53ca70908dff0b9ddc120b77a"}, + {file = "mypy-1.11.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:801780c56d1cdb896eacd5619a83e427ce436d86a3bdf9112527f24a66618fef"}, + {file = "mypy-1.11.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:41ea707d036a5307ac674ea172875f40c9d55c5394f888b168033177fce47383"}, + {file = "mypy-1.11.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:6e658bd2d20565ea86da7d91331b0eed6d2eee22dc031579e6297f3e12c758c8"}, + {file = "mypy-1.11.2-cp310-cp310-win_amd64.whl", hash = "sha256:478db5f5036817fe45adb7332d927daa62417159d49783041338921dcf646fc7"}, + {file = "mypy-1.11.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:75746e06d5fa1e91bfd5432448d00d34593b52e7e91a187d981d08d1f33d4385"}, + {file = "mypy-1.11.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a976775ab2256aadc6add633d44f100a2517d2388906ec4f13231fafbb0eccca"}, + {file = "mypy-1.11.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:cd953f221ac1379050a8a646585a29574488974f79d8082cedef62744f0a0104"}, + {file = "mypy-1.11.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:57555a7715c0a34421013144a33d280e73c08df70f3a18a552938587ce9274f4"}, + {file = "mypy-1.11.2-cp311-cp311-win_amd64.whl", hash = "sha256:36383a4fcbad95f2657642a07ba22ff797de26277158f1cc7bd234821468b1b6"}, + {file = "mypy-1.11.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:e8960dbbbf36906c5c0b7f4fbf2f0c7ffb20f4898e6a879fcf56a41a08b0d318"}, + {file = "mypy-1.11.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:06d26c277962f3fb50e13044674aa10553981ae514288cb7d0a738f495550b36"}, + {file = "mypy-1.11.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6e7184632d89d677973a14d00ae4d03214c8bc301ceefcdaf5c474866814c987"}, + {file = "mypy-1.11.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:3a66169b92452f72117e2da3a576087025449018afc2d8e9bfe5ffab865709ca"}, + {file = "mypy-1.11.2-cp312-cp312-win_amd64.whl", hash = "sha256:969ea3ef09617aff826885a22ece0ddef69d95852cdad2f60c8bb06bf1f71f70"}, + {file = "mypy-1.11.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:37c7fa6121c1cdfcaac97ce3d3b5588e847aa79b580c1e922bb5d5d2902df19b"}, + {file = "mypy-1.11.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:4a8a53bc3ffbd161b5b2a4fff2f0f1e23a33b0168f1c0778ec70e1a3d66deb86"}, + {file = "mypy-1.11.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2ff93107f01968ed834f4256bc1fc4475e2fecf6c661260066a985b52741ddce"}, + {file = "mypy-1.11.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:edb91dded4df17eae4537668b23f0ff6baf3707683734b6a818d5b9d0c0c31a1"}, + {file = "mypy-1.11.2-cp38-cp38-win_amd64.whl", hash = "sha256:ee23de8530d99b6db0573c4ef4bd8f39a2a6f9b60655bf7a1357e585a3486f2b"}, + {file = "mypy-1.11.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:801ca29f43d5acce85f8e999b1e431fb479cb02d0e11deb7d2abb56bdaf24fd6"}, + {file = "mypy-1.11.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:af8d155170fcf87a2afb55b35dc1a0ac21df4431e7d96717621962e4b9192e70"}, + {file = "mypy-1.11.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f7821776e5c4286b6a13138cc935e2e9b6fde05e081bdebf5cdb2bb97c9df81d"}, + {file = "mypy-1.11.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:539c570477a96a4e6fb718b8d5c3e0c0eba1f485df13f86d2970c91f0673148d"}, + {file = "mypy-1.11.2-cp39-cp39-win_amd64.whl", hash = "sha256:3f14cd3d386ac4d05c5a39a51b84387403dadbd936e17cb35882134d4f8f0d24"}, + {file = "mypy-1.11.2-py3-none-any.whl", hash = "sha256:b499bc07dbdcd3de92b0a8b29fdf592c111276f6a12fe29c30f6c417dd546d12"}, + {file = "mypy-1.11.2.tar.gz", hash = "sha256:7f9993ad3e0ffdc95c2a14b66dee63729f021968bff8ad911867579c65d13a79"}, ] [package.dependencies] -mypy-extensions = ">=0.4.3" +mypy-extensions = ">=1.0.0" tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} -typing-extensions = ">=3.10" +typing-extensions = ">=4.6.0" [package.extras] dmypy = ["psutil (>=4.0)"] install-types = ["pip"] -python2 = ["typed-ast (>=1.4.0,<2)"] +mypyc = ["setuptools (>=50)"] reports = ["lxml"] [[package]] @@ -1623,4 +1620,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "8d6c8db9244c042ced272bf51b3ff7ae9ff18449e5369c14a8bdef09f6cf6170" +content-hash = "6defca072e29cad69f4647832f24dbb4e3379e59f2011172c310d5c7a52755b5" diff --git a/pyproject.toml b/pyproject.toml index ea7af9d2..135ebf69 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ aiomisc-pytest = "^1.1.1" collective-checkdocs = "^0.2" coverage = "^6.5.0" coveralls = "^3.3.1" -mypy = "^0.991" +mypy = "^1" nox = "^2022.11.21" pylama = "^8.4.1" pytest = "^7.4.0" diff --git a/tests/test_amqp.py b/tests/test_amqp.py index 5557b54d..a9a5a6ff 100644 --- a/tests/test_amqp.py +++ b/tests/test_amqp.py @@ -60,12 +60,13 @@ async def test_channel_close(self, connection: aio_pika.Connection): closed = False def on_close( - ch: aio_pika.abc.AbstractChannel, - exc: Optional[Exception] = None, + ch: Optional[aio_pika.abc.AbstractChannel], + exc: Optional[BaseException] = None, ): nonlocal event, closed log.info("Close called") closed = True + assert ch is not None assert ch.is_closed event.set() diff --git a/tests/test_amqp_robust.py b/tests/test_amqp_robust.py index 28985324..4b50270a 100644 --- a/tests/test_amqp_robust.py +++ b/tests/test_amqp_robust.py @@ -61,7 +61,7 @@ async def test_channel_blocking_timeout_reopen(self, connection): close_reasons = [] close_event = asyncio.Event() reopen_event = asyncio.Event() - channel.reopen_callbacks.add(lambda _: reopen_event.set()) + channel.reopen_callbacks.add(lambda *_: reopen_event.set()) queue_name = get_random_name("test_channel_blocking_timeout_reopen") diff --git a/tests/test_amqp_robust_proxy.py b/tests/test_amqp_robust_proxy.py index c9eb2d01..8d7c4b80 100644 --- a/tests/test_amqp_robust_proxy.py +++ b/tests/test_amqp_robust_proxy.py @@ -3,7 +3,7 @@ import logging from contextlib import suppress from functools import partial -from typing import Callable, List, Type +from typing import Callable, List, Type, Optional import aiomisc import aiormq.exceptions @@ -13,7 +13,7 @@ from yarl import URL import aio_pika -from aio_pika.abc import AbstractRobustChannel +from aio_pika.abc import AbstractRobustChannel, AbstractRobustConnection from aio_pika.exceptions import QueueEmpty, CONNECTION_EXCEPTIONS from aio_pika.message import Message from aio_pika.robust_channel import RobustChannel @@ -109,7 +109,7 @@ async def test_revive_passive_queue_on_reconnect( reconnect_event = asyncio.Event() reconnect_count = 0 - def reconnect_callback(conn): + def reconnect_callback(conn: Optional[AbstractRobustConnection]): nonlocal reconnect_count reconnect_count += 1 reconnect_event.set()