Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 1063 #1261

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion kombu/pidbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from . import Exchange, Queue, Consumer, Producer
from .clocks import LamportClock
from .common import maybe_declare, oid_from
from .exceptions import InconsistencyError
from .exceptions import InconsistencyError, OperationalError
from .five import range, string_t
from .log import get_logger
from .utils.functional import maybe_evaluate, reprcall
Expand Down Expand Up @@ -151,6 +151,14 @@ def reply(self, data, exchange, routing_key, ticket, **kwargs):
serializer=self.mailbox.serializer)


def is_no_route_error_for_reply_celery_pidbox(exc_str):
if "Cannot route message for exchange" in exc_str \
and "Table empty or key no longer exists" in exc_str \
and "reply.celery.pidbox" in exc_str:
return True
return False


class Mailbox(object):
"""Process Mailbox."""

Expand Down Expand Up @@ -284,6 +292,14 @@ def _publish_reply(self, reply, exchange, routing_key, ticket,
}, retry=True,
**opts
)
except OperationalError as exc:
# Fixes https://github.com/celery/kombu/issues/1063

if not exc.args and not is_no_route_error_for_reply_celery_pidbox(exc.args[0]):
raise

error('NO_ROUTE_ERROR caught: %r', exc, exc_info=1)

except InconsistencyError:
# queue probably deleted and no one is expecting a reply.
pass
Expand Down
24 changes: 23 additions & 1 deletion t/unit/test_pidbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

from case import Mock, patch

import kombu
from kombu import Connection
from kombu import pidbox
from kombu.exceptions import ContentDisallowed, InconsistencyError
from kombu.exceptions import ContentDisallowed, InconsistencyError, OperationalError
from kombu.transport import redis
from kombu.utils.uuid import uuid


Expand Down Expand Up @@ -58,6 +60,26 @@ def test_publish_reply_ignores_InconsistencyError(self):
)
producer.publish.assert_called()

def test_publish_reply_handles_redis_OperationalError_wth_no_route_error_msg(self):
mailbox = pidbox.Mailbox('reply.celery')(self.connection)
exchange = mailbox.reply_exchange.name
channel = self.connection.channel()
mailbox.reply_queue(channel).declare()
ticket = uuid()
# Using Channel._lookup as a proxy (in absence of a redis integration test) to mock the actual
# redis.get_table call which produces the OperationalError, since _lookup is just a level above
# redis.get_table on the stack trace.
with patch.object(kombu.transport.virtual.Channel, '_lookup') as simulate_redis_get_table_err:
# raise the actual redis error
simulate_redis_get_table_err.side_effect = OperationalError(
redis.NO_ROUTE_ERROR.format(exchange, mailbox.oid))
try:
mailbox._publish_reply({'foo': 'bar'}, exchange, mailbox.oid, ticket)
except OperationalError as exc:
if pidbox.is_no_route_error_for_reply_celery_pidbox(exc.args[0]):
pytest.fail("NO_ROUTE_ERROR with specific message should have been caught")


def test_reply__collect(self):
mailbox = pidbox.Mailbox('test_reply__collect')(self.connection)
exchange = mailbox.reply_exchange.name
Expand Down