Skip to content

Commit

Permalink
SQS Channel.predefined_queues should be {} if not defined
Browse files Browse the repository at this point in the history
Previously, calling `reject` when `predefined_queues` was not configured would cause `AttributeError` to be raised from `_extract_backoff_policy_configuration_and_message`. That exception could crash the whole Celery worker and force it to exit early because `AttributeError` is not excepted in the nearby call stack.
  • Loading branch information
infinitewarp authored and auvipy committed Aug 21, 2021
1 parent 37a81bc commit 9a91e8b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
2 changes: 1 addition & 1 deletion kombu/transport/SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ def visibility_timeout(self):
@cached_property
def predefined_queues(self):
"""Map of queue_name to predefined queue settings."""
return self.transport_options.get('predefined_queues', None)
return self.transport_options.get('predefined_queues', {})

@cached_property
def queue_name_prefix(self):
Expand Down
18 changes: 18 additions & 0 deletions t/unit/transport/test_SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,24 @@ def test_basic_ack_invalid_receipt_handle(self, basic_reject_mock,
basic_reject_mock.assert_called_with(2)
assert not basic_ack_mock.called

def test_reject_when_no_predefined_queues(self):
connection = Connection(transport=SQS.Transport, transport_options={})
channel = connection.channel()

mock_apply_backoff_policy = Mock()
channel.qos.apply_backoff_policy = mock_apply_backoff_policy
queue_name = "queue-1"

exchange = Exchange('test_SQS', type='direct')
queue = Queue(queue_name, exchange, queue_name)
queue(channel).declare()

message_mock = Mock()
message_mock.delivery_info = {'routing_key': queue_name}
channel.qos._delivered['test_message_id'] = message_mock
channel.qos.reject('test_message_id')
mock_apply_backoff_policy.assert_not_called()

def test_predefined_queues_primes_queue_cache(self):
connection = Connection(transport=SQS.Transport, transport_options={
'predefined_queues': example_predefined_queues,
Expand Down

0 comments on commit 9a91e8b

Please sign in to comment.