Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't create channel in basic_consume callback #127

Closed
chemiron opened this issue Dec 12, 2016 · 4 comments
Closed

Can't create channel in basic_consume callback #127

chemiron opened this issue Dec 12, 2016 · 4 comments

Comments

@chemiron
Copy link
Contributor

Hi,
I need to create a temporary channel in the consume callback but when I call protocol.channel() method, the execution hangs. The bellow is the test code you can use to repeat the issue:

import asyncio
import aioamqp


AMQP_SETTINGS = {
    'host': 'localhost',
    'port': 5672,
}

class Test:
    proto = None

    @asyncio.coroutine
    def callback(self, channel, body, envelope, properties):
        yield from self.proto.channel()  # the problem is here, comment the line and you will see output
        print(body)

    @asyncio.coroutine
    def connect(self):
        transport, self.proto = yield from aioamqp.connect(**AMQP_SETTINGS)
        channel = yield from self.proto.channel()
        yield from channel.queue_declare(queue_name='hello')
        yield from channel.basic_consume(self.callback, queue_name='hello', no_ack=True)

    @asyncio.coroutine
    def send(self):
        channel = yield from self.proto.channel()
        yield from channel.basic_publish(
            payload='Hello World!',
            exchange_name='',
            routing_key='hello'
        )

    @asyncio.coroutine
    def execute(self):
        yield from self.connect()
        yield from self.send()
        yield from asyncio.sleep(5)


test = Test()
asyncio.get_event_loop().run_until_complete(test.execute())

Is it a bug or amqp doesn't allow create a new chennel inside the callback ?

@mwfrojdman
Copy link
Contributor

mwfrojdman commented Dec 13, 2016

I think it's because the callback is executed from within the frame dispatcher task, and the dispatcher deadlocks. The dispatcher reads frames from the server in a loop and dispatches them to handlers, with the basic.deliver handled by calling your callback function. Opening a new channel is synchronous: It sends channel.open to the server, and then waits for channel.open-ok to be received. The deadlock happens because the dispatcher is executing callback, which is waiting for channel.open-ok, but the dispatcher is not reading said frame because it's stuck in the callback.

One way to work around the problem would probably be to create a task from the callback so the deadlock is avoided, along these lines:

@asyncio.coroutine
def handle_message(self, channel, body, envelope, properties):
    yield from self.proto.channel()
    print(body)

@asyncio.coroutine
def callback(self, channel, body, envelope, properties):
    loop.create_task(self.handle_message(channel, body, envelope, properties))

I bet the same deadlock happens for all other synchronous operations on the connection/channel, too.

EDIT: don't wait for task to finish in callback, that prevents the fix.

@chemiron
Copy link
Contributor Author

@mwfrojdman Thank you for you answer, it works for me. But I have one more question:
basic_publish has a flag 'mandatory'; according to the rabbitmq documentation if the flag is set and the queue doesn't exist, server will return an unroutable message with a Return method. How can I check it with aioamqp ?

@mwfrojdman
Copy link
Contributor

mwfrojdman commented Dec 13, 2016

I don't think you can. The basic.return method frame will end up in Channel.dispatch_frame, which raises a NotImplementedError because the method is not in the dictionary of supported methods. The exception is caught inAmqpProtocol.run, which logs it with the message "error on dispatch".

Basic.return is a bit problematic, because it's not possible to route it back to the publisher reliably. Ack and reject have delivery-tag which makes that easy, but return doesn't, so if there's been many messages published on the channel, it's difficult to know which one of them was returned. Comparing the message body/properties might be one way.

Another complication is that if not using RabbitMQ's publisher confirm extension, you simply can't know if the message was published successfully, or is still being processed and is going to be returned.

@RemiCardona
Copy link
Contributor

#127 (comment)

This is an unfortunate side-effect of aioamqp's current design. This is being worked on as part of PR #118. Any help would be much appreciated.

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants