Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kombu disconnects on publish #857

Closed
stuartspotlight opened this issue May 1, 2018 · 20 comments
Closed

Kombu disconnects on publish #857

stuartspotlight opened this issue May 1, 2018 · 20 comments

Comments

@stuartspotlight
Copy link

I've posted this elsewhere:-

#463

but I figured it might be considered a separate issue so I created this issue as well.

When I do a lot of high frequency consume and publishes kombu is disconnecting on a publish command with the following error:-

  W_FORCE_CONNECT.format(attr=attr)))
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/nanowire_plugin/__init__.py", line 447, in send_to_next_plugin
    'max_retries':10})
  File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/usr/local/lib/python3.6/site-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
    mandatory=mandatory, immediate=immediate,
  File "/usr/local/lib/python3.6/site-packages/amqp/channel.py", line 1745, in basic_publish_confirm
    self.wait(spec.Basic.Ack)
  File "/usr/local/lib/python3.6/site-packages/amqp/abstract_channel.py", line 79, in wait
    self.connection.drain_events(timeout=timeout)
  File "/usr/local/lib/python3.6/site-packages/amqp/connection.py", line 471, in drain_events
    while not self.blocking_read(timeout):
  File "/usr/local/lib/python3.6/site-packages/amqp/connection.py", line 477, in blocking_read
    return self.on_inbound_frame(frame)
  File "/usr/local/lib/python3.6/site-packages/amqp/method_framing.py", line 55, in on_frame
    callback(channel, method_sig, buf, None)
  File "/usr/local/lib/python3.6/site-packages/amqp/connection.py", line 480, in on_inbound_method
    return self.channels[channel_id].dispatch_method(
TypeError: 'NoneType' object is not subscriptable

This is failing to reconnect even after attempting to reconnect 10 times:-

            producer.publish(send_payload, exchange='', routing_key=next_plugin, retry=True, 
                             retry_policy={'interval_start':1,
                                           'interval_step':2,
                                           'interval_max':10,
                                           'max_retries':10})

Also worth noting is that I get the following warning when kombu attempts to reconnect:-

/usr/local/lib/python3.6/site-packages/amqp/connection.py:292: AMQPDeprecationWarning: The .transport attribute on the connection was accessed before
the connection was established.  This is supported for now, but will
be deprecated in amqp 2.2.0.

Since amqp 2.0 you have to explicitly call Connection.connect()
before using the connection.

I assume this warning will disappear when we move to kombu 4.2 but is there any word on the publish disconnect error?

@stuartspotlight
Copy link
Author

Is this related to this issue:- #795
Will it be fixed by a move to kombu 4.2?

@stuartspotlight
Copy link
Author

Does anyone have any insight with regards to this issue since it renders the entire library borderline useless

@stuartspotlight
Copy link
Author

I've tried cloning kombu version 4.2 and this issue remains

@stuartspotlight
Copy link
Author

More information on this bug. This seems to occur much more regularly when the time between consuming and publishing is large (more than 10 seconds) and never on the first publish once the connection is established. A fairly reliable way to generate this error is to have 2 messages which will take longer than 10 seconds to process and it will almost always trigger on the second process.

@stuartspotlight
Copy link
Author

I have found a work around for this problem. I placed the publish command in a for loop with a try catch to repeat the publish command if it fails. This seems like a replication of the retry command however. Does anyone have any further insight into this?

@maingoh
Copy link

maingoh commented Jun 5, 2018

I am having a similar issue with the code below and kombu 4.1.0:

from kombu import Connection, Producer, Queue
import os
import time
import logging

logging.basicConfig(level='INFO')


def get_queue(channel, queue_name):
    queue = Queue(channel=channel,
                  name=queue_name,
                  routing_key=queue_name,
                  exchange=None,
                  passive=False,
                  durable=True,
                  exclusive=False,
                  auto_delete=False)
    queue.declare()
    return queue


def errback(exc, interval):
    logging.error('Error: %r', exc)
    logging.info('Retry in %s seconds.', interval)


connection = Connection(os.getenv('AMQP_URL'), transport='pyamqp', heartbeat=30, transport_options={'confirm_publish': True})

queue = get_queue(connection.default_channel, 'foo')

producer = Producer(connection)

logging.info("Sleeping")
time.sleep(130)

retry_policy = {'interval_start': 0, 'interval_step': 1, 'max_retries': 3}

while True:

    logging.info('Sending to queue %s' % queue)
    send = connection.ensure(producer, producer.publish, errback=errback)
    send({"hey": "you"}, routing_key=queue.name, retry=True, retry_policy=retry_policy)

    logging.info('Getting from queue %s' % queue)
    recv = connection.ensure(queue, queue.get, errback=errback)
    recv(no_ack=True)
    time.sleep(1)

Each time the sending is done it triggers a disconnection and it seems to happen only after some inactivity on the connection (simulated with the sleep(130)), though it should reconnect once and shouldn't trigger error in loop. When getting a message from the queue, it seems to be unbound to any channel (while it is bound to channel 1 when sending the message).

Here are my logs (I use a docker-compose app to test, I can provide it if you want)

rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:44:19 ===
rabbitmq_1  | accepting AMQP connection <0.558.0> (172.20.0.3:44638 -> 172.20.0.2:5672)
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:44:19 ===
rabbitmq_1  | connection <0.558.0> (172.20.0.3:44638 -> 172.20.0.2:5672): user 'user' authenticated and granted access to vhost 'test'
python_client | INFO:root:Sleeping
rabbitmq_1  | 
rabbitmq_1  | =ERROR REPORT==== 5-Jun-2018::16:45:49 ===
rabbitmq_1  | closing AMQP connection <0.558.0> (172.20.0.3:44638 -> 172.20.0.2:5672):
rabbitmq_1  | missed heartbeats from client, timeout: 30s
python_client | INFO:root:Sending to queue <Queue foo -> <Exchange u''(direct) bound to chan:1> -> foo bound to chan:1>
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:29 ===
rabbitmq_1  | accepting AMQP connection <0.582.0> (172.20.0.3:44656 -> 172.20.0.2:5672)
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:29 ===
rabbitmq_1  | connection <0.582.0> (172.20.0.3:44656 -> 172.20.0.2:5672): user 'user' authenticated and granted access to vhost 'test'
python_client | INFO:root:Getting from queue <Queue foo -> <Exchange u''(direct) bound to chan:1> -> foo bound to chan:1>
rabbitmq_1  | 
rabbitmq_1  | =WARNING REPORT==== 5-Jun-2018::16:46:29 ===
rabbitmq_1  | closing AMQP connection <0.582.0> (172.20.0.3:44656 -> 172.20.0.2:5672, vhost: 'test', user: 'user'):
rabbitmq_1  | client unexpectedly closed TCP connection
python_client | ERROR:root:Error: error(32, 'Broken pipe')
python_client | INFO:root:Retry in 0 seconds.
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:29 ===
rabbitmq_1  | accepting AMQP connection <0.595.0> (172.20.0.3:44658 -> 172.20.0.2:5672)
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:29 ===
rabbitmq_1  | connection <0.595.0> (172.20.0.3:44658 -> 172.20.0.2:5672): user 'user' authenticated and granted access to vhost 'test'
python_client | INFO:root:Sending to queue <Queue foo -> <Exchange u''(direct) bound to chan:1> -> foo bound to chan:1>
rabbitmq_1  | 
rabbitmq_1  | =WARNING REPORT==== 5-Jun-2018::16:46:30 ===
rabbitmq_1  | closing AMQP connection <0.595.0> (172.20.0.3:44658 -> 172.20.0.2:5672, vhost: 'test', user: 'user'):
rabbitmq_1  | client unexpectedly closed TCP connection
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:30 ===
rabbitmq_1  | accepting AMQP connection <0.608.0> (172.20.0.3:44660 -> 172.20.0.2:5672)
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:30 ===
rabbitmq_1  | connection <0.608.0> (172.20.0.3:44660 -> 172.20.0.2:5672): user 'user' authenticated and granted access to vhost 'test'
python_client | INFO:root:Getting from queue <Queue foo -> <Exchange u''(direct) bound to chan:None> -> foo bound to chan:None>
python_client | ERROR:root:Error: RecoverableConnectionError(None, u'connection already closed', None, u'')
python_client | INFO:root:Retry in 0 seconds.
rabbitmq_1  | 
rabbitmq_1  | =WARNING REPORT==== 5-Jun-2018::16:46:30 ===
rabbitmq_1  | closing AMQP connection <0.608.0> (172.20.0.3:44660 -> 172.20.0.2:5672, vhost: 'test', user: 'user'):
rabbitmq_1  | client unexpectedly closed TCP connection
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:30 ===
rabbitmq_1  | accepting AMQP connection <0.621.0> (172.20.0.3:44662 -> 172.20.0.2:5672)
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:30 ===
rabbitmq_1  | connection <0.621.0> (172.20.0.3:44662 -> 172.20.0.2:5672): user 'user' authenticated and granted access to vhost 'test'
python_client | INFO:root:Sending to queue <Queue foo -> <Exchange u''(direct) bound to chan:1> -> foo bound to chan:1>
rabbitmq_1  | 
rabbitmq_1  | =WARNING REPORT==== 5-Jun-2018::16:46:31 ===
rabbitmq_1  | closing AMQP connection <0.621.0> (172.20.0.3:44662 -> 172.20.0.2:5672, vhost: 'test', user: 'user'):
rabbitmq_1  | client unexpectedly closed TCP connection
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:31 ===
rabbitmq_1  | accepting AMQP connection <0.634.0> (172.20.0.3:44664 -> 172.20.0.2:5672)
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:31 ===
rabbitmq_1  | connection <0.634.0> (172.20.0.3:44664 -> 172.20.0.2:5672): user 'user' authenticated and granted access to vhost 'test'
python_client | INFO:root:Getting from queue <Queue foo -> <Exchange u''(direct) bound to chan:None> -> foo bound to chan:None>
python_client | ERROR:root:Error: RecoverableConnectionError(None, u'connection already closed', None, u'')
python_client | INFO:root:Retry in 0 seconds.
rabbitmq_1  | 
rabbitmq_1  | =WARNING REPORT==== 5-Jun-2018::16:46:31 ===
rabbitmq_1  | closing AMQP connection <0.634.0> (172.20.0.3:44664 -> 172.20.0.2:5672, vhost: 'test', user: 'user'):
rabbitmq_1  | client unexpectedly closed TCP connection
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:31 ===
rabbitmq_1  | accepting AMQP connection <0.647.0> (172.20.0.3:44666 -> 172.20.0.2:5672)
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:31 ===
rabbitmq_1  | connection <0.647.0> (172.20.0.3:44666 -> 172.20.0.2:5672): user 'user' authenticated and granted access to vhost 'test'
python_client | INFO:root:Sending to queue <Queue foo -> <Exchange u''(direct) bound to chan:1> -> foo bound to chan:1>
rabbitmq_1  | 
rabbitmq_1  | =WARNING REPORT==== 5-Jun-2018::16:46:32 ===
rabbitmq_1  | closing AMQP connection <0.647.0> (172.20.0.3:44666 -> 172.20.0.2:5672, vhost: 'test', user: 'user'):
rabbitmq_1  | client unexpectedly closed TCP connection
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:32 ===
rabbitmq_1  | accepting AMQP connection <0.660.0> (172.20.0.3:44668 -> 172.20.0.2:5672)
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:32 ===
rabbitmq_1  | connection <0.660.0> (172.20.0.3:44668 -> 172.20.0.2:5672): user 'user' authenticated and granted access to vhost 'test'
python_client | INFO:root:Getting from queue <Queue foo -> <Exchange u''(direct) bound to chan:None> -> foo bound to chan:None>
python_client | ERROR:root:Error: RecoverableConnectionError(None, u'connection already closed', None, u'')
python_client | INFO:root:Retry in 0 seconds.
rabbitmq_1  | 
rabbitmq_1  | =WARNING REPORT==== 5-Jun-2018::16:46:32 ===
rabbitmq_1  | closing AMQP connection <0.660.0> (172.20.0.3:44668 -> 172.20.0.2:5672, vhost: 'test', user: 'user'):
rabbitmq_1  | client unexpectedly closed TCP connection
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:32 ===
rabbitmq_1  | accepting AMQP connection <0.673.0> (172.20.0.3:44670 -> 172.20.0.2:5672)
rabbitmq_1  | 
rabbitmq_1  | =INFO REPORT==== 5-Jun-2018::16:46:32 ===
rabbitmq_1  | connection <0.673.0> (172.20.0.3:44670 -> 172.20.0.2:5672): user 'user' authenticated and granted access to vhost 'test'

@maingoh
Copy link

maingoh commented Jun 5, 2018

I just tried to change the publish part :

connection.ensure + retry=False => Not working:

send = connection.ensure(producer, producer.publish, errback=errback)
send({"hey": "you"}, routing_key=queue.name)

No connection.ensure + retry=True => Not working:

producer.publish({"hey": "you"}, routing_key=queue.name, retry=True)

No connection.ensure + retry=False + manual retry => Working:

import socket
from amqp.exceptions import RecoverableConnectionError

while True:
    try:
        producer.publish({"hey": "you"}, routing_key=queue.name)
        break
    except (socket.error, RecoverableConnectionError):
        logging.error(traceback.format_exc())
        producer.revive(connection.default_channel)

The last part seems to be the workaround @stuartspotlight uses. But it probably doesn't handle reconnection well ..

@maingoh
Copy link

maingoh commented Jun 5, 2018

The bug seems to appears since kombu 4.0.0. It is working as expected in 3.0.37.

@maingoh
Copy link

maingoh commented Jun 6, 2018

If I remove the consuming part, this doesn't disconnect in loop (just reconnect once and then publish as expected).

@maingoh
Copy link

maingoh commented Jun 7, 2018

With a similar code to the one I posted earlier and in heavy publish/consuming I got the exact same stacktrace as @stuartspotlight :

python_client  | [ERROR 2018-06-07 08:00:07,696 50 139948186208000 AMQPClient.py:140] Error: error(104, 'Connection reset by peer')
python_client  | Traceback (most recent call last):
python_client  |   File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 494, in _ensured
python_client  |     return fun(*args, **kwargs)
python_client  |   File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 266, in channel
python_client  |     chan = self.transport.create_channel(self.connection)
python_client  |   File "/usr/local/lib/python2.7/dist-packages/kombu/transport/pyamqp.py", line 100, in create_channel
python_client  |     return connection.channel()
python_client  |   File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 482, in channel
python_client  |     channel.open()
python_client  |   File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 432, in open
python_client  |     spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk,
python_client  |   File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 50, in send_method
python_client  |     conn.frame_writer(1, self.channel_id, sig, args, content)
python_client  |   File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 166, in write_frame
python_client  |     write(view[:offset])
python_client  |   File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 275, in write
python_client  |     self._write(s)
python_client  |   File "/usr/lib/python2.7/socket.py", line 228, in meth
python_client  |     return getattr(self._sock,name)(*args)
python_client  | error: [Errno 104] Connection reset by peer
python_client  | [INFO 2018-06-07 08:00:07,698 50 139948186208000 AMQPClient.py:141] Retry in 0 seconds.
python_client  | [INFO 2018-06-07 08:00:07,708 50 139948186208000 AMQPClient.py:144] Revived channel (channel_id:1, object_id:139947998444816)
python_client  | [INFO 2018-06-07 08:00:07,711 50 139948186208000 AMQPClient.py:223] Creating new queue
python_client  | /usr/local/lib/python2.7/dist-packages/amqp/connection.py:312: AMQPDeprecationWarning: The .transport attribute on the connection was accessed before
python_client  | the connection was established.  This is supported for now, but will
python_client  | be deprecated in amqp 2.2.0.
python_client  | 
python_client  | Since amqp 2.0 you have to explicitly call Connection.connect()
python_client  | before using the connection.
python_client  | 
python_client  |   W_FORCE_CONNECT.format(attr=attr)))
python_client  | [ERROR 2018-06-07 08:00:07,714 50 139948186208000 exception.py:135] Internal Server Error: /recognize
python_client  | Traceback (most recent call last):
python_client  |   File "./app/AMQPClient.py", line 278, in asyncRequest
python_client  |     properties=properties)
python_client  |   File "./app/AMQPClient.py", line 268, in send
python_client  |     self.ensure(self.producer, self.producer.publish, data, exchange=exchange, routing_key=routing_key, retry=True, retry_policy=retry_policy, correlation_id=correlation_id, **properties)
python_client  |   File "./app/AMQPClient.py", line 161, in ensure
python_client  |     return f(*args, **kwargs)
python_client  |   File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 494, in _ensured
python_client  |     return fun(*args, **kwargs)
python_client  |   File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 181, in publish
python_client  |     exchange_name, declare,
python_client  |   File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 494, in _ensured
python_client  |     return fun(*args, **kwargs)
python_client  |   File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 203, in _publish
python_client  |     mandatory=mandatory, immediate=immediate,
python_client  |   File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1743, in basic_publish_confirm
python_client  |     self.wait(spec.Basic.Ack)
python_client  |   File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 79, in wait
python_client  |     self.connection.drain_events(timeout=timeout)
python_client  |   File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 491, in drain_events
python_client  |     while not self.blocking_read(timeout):
python_client  |   File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 497, in blocking_read
python_client  |     return self.on_inbound_frame(frame)
python_client  |   File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 55, in on_frame
python_client  |     callback(channel, method_sig, buf, None)
python_client  |   File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 500, in on_inbound_method
python_client  |     return self.channels[channel_id].dispatch_method(
python_client  | TypeError: 'NoneType' object has no attribute '__getitem__'

I don't have more information on what raised the Connection reset by peer, it was probably because the hearbeat disconnected us. So it is should be a normal reconnection. However, what happens next is not normal.

@stuartspotlight
Copy link
Author

Sorry @maingoh, did you say you could fix the problem by removing the consuming section?

If I remove the consuming part, this doesn't disconnect in loop (just reconnect once and then publish as expected).

Does this mean you remove an explicit declaration of a consumer and use a consumer built into the "ConsumerProducerMixin" or that you don't use a consumer at all and only publish?

@maingoh
Copy link

maingoh commented Jul 23, 2018

Sorry, when i said "consuming part" i was meaning queue.get. I didn't try with a real Consumer but probably it won't disconnect with a consumer as it will keep the connection active ?

@stuartspotlight
Copy link
Author

I'm using a consumer and I'm having problems with the connection being closed on ack and not being able to recover. I tried the for loop trick and it just failed 30 times in a row. This is a major issue as its leading to messages being duplicated. When the ack fails I get this:-

finished running user code on Monster Manual.pdf at 2018-07-23 16:05:35.685752
RUNNING SEND TO NEXT PLUGIN
CREATING PAYLOAD STRING
SET UP THE PRODUCER
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
Trying to publish result to email-extraction
Output was published for Monster Manual.pdf
HAD TROUBLE ACKING TRYING ANOTHER 30 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 29 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 28 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 27 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 26 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 25 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 24 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 23 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 22 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 21 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 20 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 19 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 18 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 17 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 16 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 15 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 14 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 13 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 12 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 11 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 10 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 9 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 8 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 7 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 6 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 5 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 4 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 3 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 2 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
HAD TROUBLE ACKING TRYING ANOTHER 1 times
CONNECTION IS UP:- True
OUT CHANNEL IS UP:- True
---
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/nanowire_plugin/__init__.py", line 511, in send_to_next_plugin
    message.ack()
  File "/usr/local/lib/python3.6/site-packages/kombu/message.py", line 125, in ack
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "/usr/local/lib/python3.6/site-packages/amqp/channel.py", line 1392, in basic_ack
    spec.Basic.Ack, argsig, (delivery_tag, multiple),
  File "/usr/local/lib/python3.6/site-packages/amqp/abstract_channel.py", line 47, in send_method
    raise RecoverableConnectionError('connection already closed')
amqp.exceptions.RecoverableConnectionError: connection already closed

==================================
COULD NOT ACK, KILLING MAIN THREAD

@auvipy
Copy link
Member

auvipy commented Aug 13, 2018

this is a shame that the issue didn't get proper attention it needed.! sorry guys! did you found any workaround?

@stuartspotlight
Copy link
Author

stuartspotlight commented Aug 14, 2018

Manually repeating the publish command seems to work on the second attempt every time. The bigger issue is the failure to ack which is non-recoverable. This could be fixed by letting the user ack a message using the delivery tag and a different connection to the one it was received on but I'm not sure how I would go about implementing such a thing. I tried manually rebuilding a connection and assigning it to the message but the ack I sent didn't take, as in the ack raised no errors but rabbitmq did not remove it from the queue. Between my error with acking and @maingoh 's error with reconnecting for a second go at publishing I think being able to spawn a new connection which could be used by the message once the initial connection goes down would help us both.

@maingoh
Copy link

maingoh commented Feb 7, 2019

I got a new snippet (using kombu 4.3) that triggers those connections errors without any publisher, it seems to be a bug in the reviving part of the queues ? Using two queues and doing get (or any other operation)

connection = Connection(url, transport='pyamqp', transport_options={'confirm_publish': True})

connection.connect()

queue_foo = get_queue(connection, 'foo')
queue_bar = get_queue(connection, 'bar')

# Simulating a connection closed from the server
connection.close()

# we can call connection.connect() here, it will be the same

for i in range(4):

    logging.info('queue foo %s' % queue_foo)
    recv = connection.ensure(queue_foo, queue_foo.get, errback=errback)
    recv(no_ack=True)
    logging.info('queue foo: %s' % queue_foo)

    logging.info('queue bar %s' % queue_bar)
    recv = connection.ensure(queue_bar, queue_bar.get, errback=errback)
    recv(no_ack=True)
    logging.info('queue bar: %s' % queue_bar)

    time.sleep(1)

It seems that when one queue is revived, it unbound the other one ?
With pyamqp transport:

INFO:root:queue foo <Queue foo -> <Exchange u''(direct) bound to chan:None> -> foo bound to chan:None>
ERROR:root:Error: RecoverableConnectionError(None, u'connection already closed', None, u'')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 694, in get
    message = self.channel.basic_get(queue=self.name, no_ack=no_ack)
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1639, in basic_get
    wait=[spec.Basic.GetOk, spec.Basic.GetEmpty], returns_tuple=True,
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 47, in send_method
    raise RecoverableConnectionError('connection already closed')
RecoverableConnectionError: connection already closed
INFO:root:Retry in 0 seconds.
INFO:root:queue foo: <Queue foo -> <Exchange u''(direct) bound to chan:1> -> foo bound to chan:1>
INFO:root:queue bar <Queue bar -> <Exchange u''(direct) bound to chan:None> -> bar bound to chan:None>
ERROR:root:Error: RecoverableConnectionError(None, u'connection already closed', None, u'')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 694, in get
    message = self.channel.basic_get(queue=self.name, no_ack=no_ack)
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1639, in basic_get
    wait=[spec.Basic.GetOk, spec.Basic.GetEmpty], returns_tuple=True,
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 47, in send_method
    raise RecoverableConnectionError('connection already closed')
RecoverableConnectionError: connection already closed
INFO:root:Retry in 0 seconds.
INFO:root:queue bar: <Queue bar -> <Exchange u''(direct) bound to chan:1> -> bar bound to chan:1>
INFO:root:queue foo <Queue foo -> <Exchange u''(direct) bound to chan:None> -> foo bound to chan:None>
ERROR:root:Error: RecoverableConnectionError(None, u'connection already closed', None, u'')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 694, in get
    message = self.channel.basic_get(queue=self.name, no_ack=no_ack)
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1639, in basic_get
    wait=[spec.Basic.GetOk, spec.Basic.GetEmpty], returns_tuple=True,
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 47, in send_method
    raise RecoverableConnectionError('connection already closed')
RecoverableConnectionError: connection already closed
INFO:root:Retry in 0 seconds.
INFO:root:queue foo: <Queue foo -> <Exchange u''(direct) bound to chan:1> -> foo bound to chan:1>
INFO:root:queue bar <Queue bar -> <Exchange u''(direct) bound to chan:None> -> bar bound to chan:None>
ERROR:root:Error: RecoverableConnectionError(None, u'connection already closed', None, u'')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 694, in get
    message = self.channel.basic_get(queue=self.name, no_ack=no_ack)
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1639, in basic_get
    wait=[spec.Basic.GetOk, spec.Basic.GetEmpty], returns_tuple=True,
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 47, in send_method
    raise RecoverableConnectionError('connection already closed')
RecoverableConnectionError: connection already closed
INFO:root:Retry in 0 seconds.
INFO:root:queue bar: <Queue bar -> <Exchange u''(direct) bound to chan:1> -> bar bound to chan:1>

With librabbitmq transport, here the connection seems to be None:

INFO:root:queue foo <Queue foo -> <Exchange u''(direct) bound to chan:1> -> foo bound to chan:1>
ERROR:root:Error: ConnectionError('Operation on closed connection',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 694, in get
    message = self.channel.basic_get(queue=self.name, no_ack=no_ack)
  File "/usr/local/lib/python2.7/dist-packages/librabbitmq/__init__.py", line 72, in basic_get
    frame = self.connection._basic_get(self.channel_id, queue, no_ack)
ConnectionError: Operation on closed connection
INFO:root:Retry in 0 seconds.
INFO:root:queue foo: <Queue foo -> <Exchange u''(direct) bound to chan:1> -> foo bound to chan:1>
INFO:root:queue bar <Queue bar -> <Exchange u''(direct) bound to chan:1> -> bar bound to chan:1>
ERROR:root:Error: ConnectionError('Operation on closed connection',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 694, in get
    message = self.channel.basic_get(queue=self.name, no_ack=no_ack)
  File "/usr/local/lib/python2.7/dist-packages/librabbitmq/__init__.py", line 72, in basic_get
    frame = self.connection._basic_get(self.channel_id, queue, no_ack)
ConnectionError: Operation on closed connection
INFO:root:Retry in 0 seconds.
INFO:root:queue bar: <Queue bar -> <Exchange u''(direct) bound to chan:1> -> bar bound to chan:1>
INFO:root:queue foo <Queue foo -> <Exchange u''(direct) bound to chan:1> -> foo bound to chan:1>
Traceback (most recent call last):
  File "debug_kombu.py", line 69, in <module>
    recv(no_ack=True)
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 694, in get
    message = self.channel.basic_get(queue=self.name, no_ack=no_ack)
  File "/usr/local/lib/python2.7/dist-packages/librabbitmq/__init__.py", line 72, in basic_get
    frame = self.connection._basic_get(self.channel_id, queue, no_ack)
AttributeError: 'NoneType' object has no attribute '_basic_get'

Reviving manually the other queue seems to work, but do I really have to revive all objects created when a connection error occurs ?

def on_revive(channel):
    queue_bar.revive(channel)

recv = connection.ensure(queue_foo, queue_foo.get, errback=errback, on_revive=on_revive)

@maingoh
Copy link

maingoh commented Feb 7, 2019

It seems to me that when a connection occurs, it calls the transport collect which makes previous entities in a bad state ?
If I patch the connection.collect by commenting the line gc_transport(self._connection), both queues triggers a connection error in the first iteration, but then they get stable and don't trigger anything.

@jorgii
Copy link

jorgii commented Feb 17, 2019

I experienced the same issue. As a workaround it looks like adding retry=True fixes it.

producer.publish(
            body=message,
            exchange=exchange,
            declare=[queue],
            retry=True)

@emukans
Copy link

emukans commented May 8, 2019

I experienced the same issue. The problem was in qos prefetch_size. The message queue was too large and prefetch was set to 0, it means that consumer tried to prefetch all queue. The solution would be to reduce qos prefetch_size or if it is not possible, purge message queue and resed it again in smaller batches.

@auvipy auvipy added this to the 4.5.x Maintenance milestone May 8, 2019
@auvipy auvipy modified the milestones: 4.6.0, 4.7 Nov 2, 2019
@auvipy auvipy removed this from the 5.1.0 milestone Sep 12, 2021
@auvipy
Copy link
Member

auvipy commented Sep 12, 2021

I experienced the same issue. The problem was in qos prefetch_size. The message queue was too large and prefetch was set to 0, it means that consumer tried to prefetch all queue. The solution would be to reduce qos prefetch_size or if it is not possible, purge message queue and resed it again in smaller batches.

closing the issue based on this.

@auvipy auvipy closed this as completed Sep 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants