Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add azure service bus auto lock renewal #1788

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions kombu/transport/azureservicebus.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
* ``retry_backoff_factor`` - Azure SDK exponential backoff factor.
Default ``0.8``
* ``retry_backoff_max`` - Azure SDK retry total time. Default ``120``
* ``use_lock_renewal`` - Use Azure SDK Auto Lock Renewal. Works only if receive mode ``PEEK_LOCK`` is in use
* ``max_lock_renewal_duration`` - Azure SDK time in seconds that locks registered to a renewer
should be maintained for. Max value is ``300`` (5 minutes)
"""

from __future__ import annotations
Expand All @@ -64,9 +67,9 @@
import azure.core.exceptions
import azure.servicebus.exceptions
import isodate
from azure.servicebus import (ServiceBusClient, ServiceBusMessage,
ServiceBusReceiveMode, ServiceBusReceiver,
ServiceBusSender)
from azure.servicebus import (AutoLockRenewer, ServiceBusClient,
ServiceBusMessage, ServiceBusReceiveMode,
ServiceBusReceiver, ServiceBusSender)
from azure.servicebus.management import ServiceBusAdministrationClient

try:
Expand Down Expand Up @@ -117,6 +120,7 @@ class Channel(virtual.Channel):
default_uamqp_keep_alive_interval: int = 30
# number of retries (is the default from service bus repo)
default_retry_total: int = 3
default_max_lock_renewal_duration: int = 300
# exponential backoff factor (is the default from service bus repo)
default_retry_backoff_factor: float = 0.8
# Max time to backoff (is the default from service bus repo)
Expand Down Expand Up @@ -198,9 +202,15 @@ def _get_asb_receiver(
cache_key = queue_cache_key or queue
queue_obj = self._queue_cache.get(cache_key, None)
if queue_obj is None or queue_obj.receiver is None:
auto_lock_renewer = None
if self.use_lock_renewal and recv_mode == ServiceBusReceiveMode.PEEK_LOCK:
auto_lock_renewer = AutoLockRenewer(
max_lock_renewal_duration=self.max_lock_renewal_duration)

receiver = self.queue_service.get_queue_receiver(
queue_name=queue, receive_mode=recv_mode,
keep_alive=self.uamqp_keep_alive_interval)
keep_alive=self.uamqp_keep_alive_interval,
auto_lock_renewer=auto_lock_renewer)
queue_obj = self._add_queue_to_cache(cache_key, receiver=receiver)
return queue_obj

Expand Down Expand Up @@ -272,6 +282,7 @@ def _get(

# message.body is either byte or generator[bytes]
message = messages[0]

if not isinstance(message.body, bytes):
body = b''.join(message.body)
else:
Expand Down Expand Up @@ -395,6 +406,16 @@ def wait_time_seconds(self) -> int:
return self.transport_options.get('wait_time_seconds',
self.default_wait_time_seconds)

@cached_property
def max_lock_renewal_duration(self) -> int:
return min(self.transport_options.get('max_lock_renewal_duration',
self.default_max_lock_renewal_duration),
self.default_max_lock_renewal_duration)

@cached_property
def use_lock_renewal(self) -> int:
return self.transport_options.get('use_lock_renewal', False)

@cached_property
def peek_lock_seconds(self) -> int:
return min(self.transport_options.get('peek_lock_seconds',
Expand Down
Loading