From 28ae8605a4d954170729356f933ae996045a6397 Mon Sep 17 00:00:00 2001 From: Konstantin Lapkovsky Date: Tue, 12 Sep 2023 20:59:28 +0400 Subject: [PATCH 1/4] Add Azure Service Bus auto lock renewal functionality. --- kombu/transport/azureservicebus.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py index 5aa7c03be..cddb9186a 100644 --- a/kombu/transport/azureservicebus.py +++ b/kombu/transport/azureservicebus.py @@ -53,6 +53,9 @@ * ``retry_backoff_factor`` - Azure SDK exponential backoff factor. Default ``0.8`` * ``retry_backoff_max`` - Azure SDK retry total time. Default ``120`` +* ``use_lock_renewal`` - Use Lock Renewal Azure SDK retry total time. Default ``120`` +* ``max_lock_renewal_duration`` - Azure SDK time in seconds that locks registered to a renewer + should be maintained for. Max value is ``300`` (5 minutes) """ from __future__ import annotations @@ -64,9 +67,9 @@ import azure.core.exceptions import azure.servicebus.exceptions import isodate -from azure.servicebus import (ServiceBusClient, ServiceBusMessage, - ServiceBusReceiveMode, ServiceBusReceiver, - ServiceBusSender) +from azure.servicebus import (AutoLockRenewer, ServiceBusClient, + ServiceBusMessage, ServiceBusReceiveMode, + ServiceBusReceiver, ServiceBusSender) from azure.servicebus.management import ServiceBusAdministrationClient try: @@ -117,6 +120,7 @@ class Channel(virtual.Channel): default_uamqp_keep_alive_interval: int = 30 # number of retries (is the default from service bus repo) default_retry_total: int = 3 + default_max_lock_renewal_duration = 300 # exponential backoff factor (is the default from service bus repo) default_retry_backoff_factor: float = 0.8 # Max time to backoff (is the default from service bus repo) @@ -272,6 +276,15 @@ def _get( # message.body is either byte or generator[bytes] message = messages[0] + + if self.use_lock_renewal: + with self.queue_service.get_queue_receiver( + queue_name=queue, + receive_mode=ServiceBusReceiveMode.PEEK_LOCK, + keep_alive=self.uamqp_keep_alive_interval + ) as receiver, AutoLockRenewer() as lock_renewer: + lock_renewer.register(receiver, message, max_lock_renewal_duration=self.max_lock_renewal_duration) + if not isinstance(message.body, bytes): body = b''.join(message.body) else: @@ -395,6 +408,16 @@ def wait_time_seconds(self) -> int: return self.transport_options.get('wait_time_seconds', self.default_wait_time_seconds) + @cached_property + def max_lock_renewal_duration(self) -> int: + return min(self.transport_options.get('max_lock_renewal_duration', + self.default_max_lock_renewal_duration), + self.default_max_lock_renewal_duration) + + @cached_property + def use_lock_renewal(self) -> int: + return self.transport_options.get('use_lock_renewal', False) + @cached_property def peek_lock_seconds(self) -> int: return min(self.transport_options.get('peek_lock_seconds', From f1e06c2ee69811774a88747a289ee823c89155f6 Mon Sep 17 00:00:00 2001 From: Konstantin Lapkovsky Date: Tue, 12 Sep 2023 21:02:45 +0400 Subject: [PATCH 2/4] Add property type. --- kombu/transport/azureservicebus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py index cddb9186a..a9ea004e8 100644 --- a/kombu/transport/azureservicebus.py +++ b/kombu/transport/azureservicebus.py @@ -120,7 +120,7 @@ class Channel(virtual.Channel): default_uamqp_keep_alive_interval: int = 30 # number of retries (is the default from service bus repo) default_retry_total: int = 3 - default_max_lock_renewal_duration = 300 + default_max_lock_renewal_duration: int = 300 # exponential backoff factor (is the default from service bus repo) default_retry_backoff_factor: float = 0.8 # Max time to backoff (is the default from service bus repo) From 59fe35ef2f800165adc1c508080d8f8a7e7deed7 Mon Sep 17 00:00:00 2001 From: Konstantin Lapkovsky Date: Wed, 13 Sep 2023 16:10:59 +0400 Subject: [PATCH 3/4] Delegate working with AutoLockRenewer to receiver instance. --- kombu/transport/azureservicebus.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py index a9ea004e8..ceb52f83a 100644 --- a/kombu/transport/azureservicebus.py +++ b/kombu/transport/azureservicebus.py @@ -202,9 +202,14 @@ def _get_asb_receiver( cache_key = queue_cache_key or queue queue_obj = self._queue_cache.get(cache_key, None) if queue_obj is None or queue_obj.receiver is None: + auto_lock_renewer = None + if self.use_lock_renewal: + auto_lock_renewer = AutoLockRenewer(max_lock_renewal_duration=self.max_lock_renewal_duration) + receiver = self.queue_service.get_queue_receiver( queue_name=queue, receive_mode=recv_mode, - keep_alive=self.uamqp_keep_alive_interval) + keep_alive=self.uamqp_keep_alive_interval, + auto_lock_renewer=auto_lock_renewer) queue_obj = self._add_queue_to_cache(cache_key, receiver=receiver) return queue_obj @@ -277,14 +282,6 @@ def _get( # message.body is either byte or generator[bytes] message = messages[0] - if self.use_lock_renewal: - with self.queue_service.get_queue_receiver( - queue_name=queue, - receive_mode=ServiceBusReceiveMode.PEEK_LOCK, - keep_alive=self.uamqp_keep_alive_interval - ) as receiver, AutoLockRenewer() as lock_renewer: - lock_renewer.register(receiver, message, max_lock_renewal_duration=self.max_lock_renewal_duration) - if not isinstance(message.body, bytes): body = b''.join(message.body) else: From 73ec4687997d31cf6e49c45d454b8e33cd5015af Mon Sep 17 00:00:00 2001 From: Konstantin Lapkovsky Date: Wed, 13 Sep 2023 21:41:17 +0400 Subject: [PATCH 4/4] Use AutoLockRenewer only if receive mode PEEK_LOCK is in use. --- kombu/transport/azureservicebus.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py index ceb52f83a..89731ae01 100644 --- a/kombu/transport/azureservicebus.py +++ b/kombu/transport/azureservicebus.py @@ -53,7 +53,7 @@ * ``retry_backoff_factor`` - Azure SDK exponential backoff factor. Default ``0.8`` * ``retry_backoff_max`` - Azure SDK retry total time. Default ``120`` -* ``use_lock_renewal`` - Use Lock Renewal Azure SDK retry total time. Default ``120`` +* ``use_lock_renewal`` - Use Azure SDK Auto Lock Renewal. Works only if receive mode ``PEEK_LOCK`` is in use * ``max_lock_renewal_duration`` - Azure SDK time in seconds that locks registered to a renewer should be maintained for. Max value is ``300`` (5 minutes) """ @@ -203,8 +203,9 @@ def _get_asb_receiver( queue_obj = self._queue_cache.get(cache_key, None) if queue_obj is None or queue_obj.receiver is None: auto_lock_renewer = None - if self.use_lock_renewal: - auto_lock_renewer = AutoLockRenewer(max_lock_renewal_duration=self.max_lock_renewal_duration) + if self.use_lock_renewal and recv_mode == ServiceBusReceiveMode.PEEK_LOCK: + auto_lock_renewer = AutoLockRenewer( + max_lock_renewal_duration=self.max_lock_renewal_duration) receiver = self.queue_service.get_queue_receiver( queue_name=queue, receive_mode=recv_mode,