Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: raise ChannelInvalidStateError at exchange.publish with closed channel #637

Merged
merged 2 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions aio_pika/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ async def publish(
f"Can not publish to internal exchange: '{self.name}'!",
)

if self.channel.is_closed:
raise aiormq.exceptions.ChannelInvalidStateError(
"%r closed" % self.channel,
)

channel = await self.channel.get_underlay_channel()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't channel closure checked in this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it is waiting for channel readiness forewer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean waiting here?

async def get_underlay_channel(self) -> aiormq.abc.AbstractChannel:
await self._connection.ready()
return await super().get_underlay_channel()

There is waiting for connection readiness - not for channel readiness, and in the test you are closing connection, not channel. Maybe in publish method should be checked connection (not channel) and aiormq.exceptions.ConnectionClosed exception raised?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, that is correct. I just followed the aiormq exceptions style. Also, connection and channel and quite closely related, so they can be open/closed both only. I am checking channel due it is the most important thing for we functionality. Can @mosquito resolve the dispute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change the code without any problems and basically agree with your points, but not sure about the person, who makes the final decision about PR merging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, we have no access to self.channel._connection object due it is RobustChannel-class attribute only. So, we should check self.channel to be compatible with any AbstractChannel implementation

return await channel.basic_publish(
exchange=self.name,
Expand Down
28 changes: 28 additions & 0 deletions tests/test_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,34 @@ async def test_simple_publish_and_receive_to_bound_exchange(

await queue.unbind(dest_exchange, routing_key)

async def test_simple_publish_with_closed_channel(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be called "test_simple_publish_with_closed_connection" ?

self,
connection: aio_pika.Connection,
declare_exchange: Callable,
declare_queue: Callable,
):
routing_key = get_random_name()

channel = await connection.channel(publisher_confirms=False)

exchange = await declare_exchange(
"direct", auto_delete=True, channel=channel,
)

await connection.close()

body = bytes(shortuuid.uuid(), "utf-8")

with pytest.raises(aiormq.exceptions.ChannelInvalidStateError):
await exchange.publish(
Message(
body,
content_type="text/plain",
headers={"foo": "bar"},
),
routing_key,
)

async def test_incoming_message_info(
self,
channel: aio_pika.Channel,
Expand Down
Loading