From c0f42636aa3ec9dea44472274e76cb0a9395c4d1 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Fri, 12 Jul 2024 13:57:42 +0300 Subject: [PATCH 1/2] fix: raise ChannelInvalidStateError at exchange.publish with closed channel call --- aio_pika/exchange.py | 3 +++ tests/test_amqp.py | 24 ++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/aio_pika/exchange.py b/aio_pika/exchange.py index ff1e5963..6287d724 100644 --- a/aio_pika/exchange.py +++ b/aio_pika/exchange.py @@ -190,6 +190,9 @@ async def publish( f"Can not publish to internal exchange: '{self.name}'!", ) + if self.channel.is_closed: + raise aiormq.exceptions.ChannelInvalidStateError("%r closed" % self.channel) + channel = await self.channel.get_underlay_channel() return await channel.basic_publish( exchange=self.name, diff --git a/tests/test_amqp.py b/tests/test_amqp.py index c104d3e4..225af408 100644 --- a/tests/test_amqp.py +++ b/tests/test_amqp.py @@ -339,6 +339,30 @@ async def test_simple_publish_and_receive_to_bound_exchange( await queue.unbind(dest_exchange, routing_key) + async def test_simple_publish_with_closed_channel( + self, + connection: aio_pika.Connection, + declare_exchange: Callable, + declare_queue: Callable, + ): + routing_key = get_random_name() + + channel = await connection.channel(publisher_confirms=False) + + exchange = await declare_exchange( + "direct", auto_delete=True, channel=channel, + ) + + await connection.close() + + body = bytes(shortuuid.uuid(), "utf-8") + + with pytest.raises(aiormq.exceptions.ChannelInvalidStateError): + await exchange.publish( + Message(body, content_type="text/plain", headers={"foo": "bar"}), + routing_key, + ) + async def test_incoming_message_info( self, channel: aio_pika.Channel, From 4e93dc4d0b8323e30f78e87c40dc1512d96b934e Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Fri, 12 Jul 2024 14:03:15 +0300 Subject: [PATCH 2/2] lint: fix line length --- aio_pika/exchange.py | 4 +++- tests/test_amqp.py | 6 +++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/aio_pika/exchange.py b/aio_pika/exchange.py index 6287d724..2e5cc3d1 100644 --- a/aio_pika/exchange.py +++ b/aio_pika/exchange.py @@ -191,7 +191,9 @@ async def publish( ) if self.channel.is_closed: - raise aiormq.exceptions.ChannelInvalidStateError("%r closed" % self.channel) + raise aiormq.exceptions.ChannelInvalidStateError( + "%r closed" % self.channel, + ) channel = await self.channel.get_underlay_channel() return await channel.basic_publish( diff --git a/tests/test_amqp.py b/tests/test_amqp.py index 225af408..3aff94fb 100644 --- a/tests/test_amqp.py +++ b/tests/test_amqp.py @@ -359,7 +359,11 @@ async def test_simple_publish_with_closed_channel( with pytest.raises(aiormq.exceptions.ChannelInvalidStateError): await exchange.publish( - Message(body, content_type="text/plain", headers={"foo": "bar"}), + Message( + body, + content_type="text/plain", + headers={"foo": "bar"}, + ), routing_key, )