Skip to content

Commit

Permalink
[ServiceBus] ServiceBus Operation Timeout Support (#13854)
Browse files Browse the repository at this point in the history
* sync opeartion timeout update

* add support for operation timeout to certain methods

* small fix

* fix 2.7 syntax error

* fix mypy

* fix mypy

* add test and fix a small bug

* improve code

* improve test code

* Update sdk/servicebus/azure-servicebus/CHANGELOG.md

Co-authored-by: KieranBrantnerMagee <[email protected]>

* move timeout into kwargs in async

* addressing pr review and update uamqp dependency to include the latest fix

* remove configuration check

* fix bug in test hacking

* fix bug where timeout=0 in tests

* tweak changelog position

* update changelog

* add try-finally in timeout test cases

Co-authored-by: KieranBrantnerMagee <[email protected]>
  • Loading branch information
yunhaoling and KieranBrantnerMagee authored Oct 7, 2020
1 parent b111f08 commit f06ccf0
Show file tree
Hide file tree
Showing 21 changed files with 497 additions and 156 deletions.
13 changes: 13 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,23 @@

## 7.0.0b8 (Unreleased)

**New Features**

* Added support for `timeout` parameter on the following operations:
- `ServiceBusSender`: `send_messages`, `schedule_messages` and `cancel_scheduled_messages`
- `ServiceBusReceiver`: `receive_deferred_messages` and `peek_messages`
- `ServiceBusSession`: `get_state`, `set_state` and `renew_lock`
- `ReceivedMessage`: `renew_lock`

**BugFixes**

* Updated uAMQP dependency to 1.2.11.
- Fixed bug where amqp message `footer` and `delivery_annotation` were not encoded into the outgoing payload.

## 7.0.0b7 (2020-10-05)

**Breaking Changes**

* Passing any type other than `ReceiveMode` as parameter `receive_mode` now throws a `TypeError` instead of `AttributeError`.
* Administration Client calls now take only entity names, not `<Entity>Descriptions` as well to reduce ambiguity in which entity was being acted on. TypeError will now be thrown on improper parameter types (non-string).
* `AMQPMessage` (`Message.amqp_message`) properties are now read-only, changes of these properties would not be reflected in the underlying message. This may be subject to change before GA.
Expand Down
56 changes: 44 additions & 12 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from urllib.parse import quote_plus

import uamqp
from uamqp import utils
from uamqp import utils, compat
from uamqp.message import MessageProperties

from azure.core.credentials import AccessToken
Expand All @@ -24,6 +24,7 @@
from .exceptions import (
ServiceBusError,
ServiceBusAuthenticationError,
OperationTimeoutError,
_create_servicebus_exception
)
from ._common.utils import create_properties
Expand Down Expand Up @@ -233,14 +234,14 @@ def _backoff(
self,
retried_times,
last_exception,
timeout=None,
abs_timeout_time=None,
entity_name=None
):
# type: (int, Exception, Optional[float], str) -> None
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
if backoff <= self._config.retry_backoff_max and (
timeout is None or backoff <= timeout
abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time
): # pylint:disable=no-else-return
time.sleep(backoff)
_LOGGER.info(
Expand All @@ -259,14 +260,17 @@ def _backoff(
def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# type: (Callable, Optional[float], Any) -> Any
require_last_exception = kwargs.pop("require_last_exception", False)
require_timeout = kwargs.pop("require_timeout", False)
operation_requires_timeout = kwargs.pop("operation_requires_timeout", False)
retried_times = 0
max_retries = self._config.retry_total

abs_timeout_time = (time.time() + timeout) if (operation_requires_timeout and timeout) else None

while retried_times <= max_retries:
try:
if require_timeout:
kwargs["timeout"] = timeout
if operation_requires_timeout and abs_timeout_time:
remaining_timeout = abs_timeout_time - time.time()
kwargs["timeout"] = remaining_timeout
return operation(**kwargs)
except StopIteration:
raise
Expand All @@ -285,13 +289,37 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs):
self._backoff(
retried_times=retried_times,
last_exception=last_exception,
timeout=timeout
abs_timeout_time=abs_timeout_time
)

def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_associated_link=True, **kwargs):
# type: (str, uamqp.Message, Callable, bool, Any) -> uamqp.Message
def _mgmt_request_response(
self,
mgmt_operation,
message,
callback,
keep_alive_associated_link=True,
timeout=None,
**kwargs
):
# type: (bytes, uamqp.Message, Callable, bool, Optional[float], Any) -> uamqp.Message
"""
Execute an amqp management operation.
:param bytes mgmt_operation: The type of operation to be performed. This value will
be service-specific, but common values include READ, CREATE and UPDATE.
This value will be added as an application property on the message.
:param message: The message to send in the management request.
:paramtype message: ~uamqp.message.Message
:param callback: The callback which is used to parse the returning message.
:paramtype callback: Callable[int, ~uamqp.message.Message, str]
:param keep_alive_associated_link: A boolean flag for keeping associated amqp sender/receiver link alive when
executing operation on mgmt links.
:param timeout: timeout in seconds executing the mgmt operation.
:rtype: None
"""
self._open()
application_properties = {}

# Some mgmt calls do not support an associated link name (such as list_sessions). Most do, so on by default.
if keep_alive_associated_link:
try:
Expand All @@ -314,19 +342,23 @@ def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_a
mgmt_operation,
op_type=MGMT_REQUEST_OP_TYPE_ENTITY_MGMT,
node=self._mgmt_target.encode(self._config.encoding),
timeout=5000,
timeout=timeout * 1000 if timeout else None,
callback=callback
)
except Exception as exp: # pylint: disable=broad-except
if isinstance(exp, compat.TimeoutException):
raise OperationTimeoutError("Management operation timed out.", inner_exception=exp)
raise ServiceBusError("Management request failed: {}".format(exp), exp)

def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, **kwargs):
# type: (bytes, Dict[str, Any], Callable, Any) -> Any
def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, timeout=None, **kwargs):
# type: (bytes, Dict[str, Any], Callable, Optional[float], Any) -> Any
return self._do_retryable_operation(
self._mgmt_request_response,
mgmt_operation=mgmt_operation,
message=message,
callback=callback,
timeout=timeout,
operation_requires_timeout=True,
**kwargs
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ def __init__(self, **kwargs):
if self.http_proxy
else kwargs.get("transport_type", TransportType.Amqp)
)
# The following configs are not public, for internal usage only
self.auth_timeout = kwargs.get("auth_timeout", 60) # type: int
self.encoding = kwargs.get("encoding", "UTF-8")
self.auto_reconnect = kwargs.get("auto_reconnect", True)
self.keep_alive = kwargs.get("keep_alive", 30)
self.timeout = kwargs.get("timeout", 60) # type: float
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,6 @@ def _to_outgoing_message(self):
via_partition_key=self.via_partition_key
)


@property
def dead_letter_error_description(self):
# type: () -> Optional[str]
Expand Down Expand Up @@ -1043,8 +1042,8 @@ def defer(self):
self._settle_message(MESSAGE_DEFER)
self._settled = True

def renew_lock(self):
# type: () -> datetime.datetime
def renew_lock(self, **kwargs):
# type: (Any) -> datetime.datetime
# pylint: disable=protected-access,no-member
"""Renew the message lock.
Expand All @@ -1060,14 +1059,16 @@ def renew_lock(self):
Lock renewal can be performed as a background task by registering the message with an
`azure.servicebus.AutoLockRenew` instance.
:keyword float timeout: The total operation timeout in seconds including all the retries. The value must be
greater than 0 if specified. The default value is None, meaning no timeout.
:returns: The utc datetime the lock is set to expire at.
:rtype: datetime.datetime
:raises: TypeError if the message is sessionful.
:raises: ~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.
"""
try:
if self._receiver.session: # type: ignore
if self._receiver.session: # type: ignore
raise TypeError("Session messages cannot be renewed. Please renew the Session lock instead.")
except AttributeError:
pass
Expand All @@ -1076,8 +1077,12 @@ def renew_lock(self):
if not token:
raise ValueError("Unable to renew lock - no lock token found.")

expiry = self._receiver._renew_locks(token) # type: ignore
self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) # type: datetime.datetime
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")

expiry = self._receiver._renew_locks(token, timeout=timeout) # type: ignore
self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) # type: datetime.datetime

return self._expiry

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,6 @@ def _open(self):
self.close()
raise

def close(self):
# type: () -> None
super(ServiceBusReceiver, self).close()
self._message_iter = None # pylint: disable=attribute-defined-outside-init

def _receive(self, max_message_count=None, timeout=None):
# type: (Optional[int], Optional[float]) -> List[ReceivedMessage]
# pylint: disable=protected-access
Expand Down Expand Up @@ -276,15 +271,22 @@ def _settle_message(self, settlement, lock_tokens, dead_letter_details=None):
mgmt_handlers.default
)

def _renew_locks(self, *lock_tokens):
# type: (str) -> Any
def _renew_locks(self, *lock_tokens, **kwargs):
# type: (str, Any) -> Any
timeout = kwargs.pop("timeout", None)
message = {MGMT_REQUEST_LOCK_TOKENS: types.AMQPArray(lock_tokens)}
return self._mgmt_request_response_with_retry(
REQUEST_RESPONSE_RENEWLOCK_OPERATION,
message,
mgmt_handlers.lock_renew_op
mgmt_handlers.lock_renew_op,
timeout=timeout
)

def close(self):
# type: () -> None
super(ServiceBusReceiver, self).close()
self._message_iter = None # pylint: disable=attribute-defined-outside-init

def get_streaming_message_iter(self, max_wait_time=None):
# type: (float) -> Iterator[ReceivedMessage]
"""Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until
Expand Down Expand Up @@ -413,18 +415,20 @@ def receive_messages(self, max_message_count=None, max_wait_time=None):
self._receive,
max_message_count=max_message_count,
timeout=max_wait_time,
require_timeout=True
operation_requires_timeout=True
)

def receive_deferred_messages(self, sequence_numbers):
# type: (Union[int,List[int]]) -> List[ReceivedMessage]
def receive_deferred_messages(self, sequence_numbers, **kwargs):
# type: (Union[int,List[int]], Any) -> List[ReceivedMessage]
"""Receive messages that have previously been deferred.
When receiving deferred messages from a partitioned entity, all of the supplied
sequence numbers must be messages from the same partition.
:param Union[int,List[int]] sequence_numbers: A list of the sequence numbers of messages that have been
deferred.
:keyword float timeout: The total operation timeout in seconds including all the retries. The value must be
greater than 0 if specified. The default value is None, meaning no timeout.
:rtype: List[~azure.servicebus.ReceivedMessage]
.. admonition:: Example:
Expand All @@ -438,6 +442,9 @@ def receive_deferred_messages(self, sequence_numbers):
"""
self._check_live()
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
if isinstance(sequence_numbers, six.integer_types):
sequence_numbers = [sequence_numbers]
if not sequence_numbers:
Expand All @@ -458,20 +465,23 @@ def receive_deferred_messages(self, sequence_numbers):
messages = self._mgmt_request_response_with_retry(
REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER,
message,
handler
handler,
timeout=timeout
)
return messages

def peek_messages(self, max_message_count=1, sequence_number=None):
# type: (int, Optional[int]) -> List[PeekedMessage]
def peek_messages(self, max_message_count=1, **kwargs):
# type: (int, Any) -> List[PeekedMessage]
"""Browse messages currently pending in the queue.
Peeked messages are not removed from queue, nor are they locked. They cannot be completed,
deferred or dead-lettered.
:param int max_message_count: The maximum number of messages to try and peek. The default
value is 1.
:param int sequence_number: A message sequence number from which to start browsing messages.
:keyword int sequence_number: A message sequence number from which to start browsing messages.
:keyword float timeout: The total operation timeout in seconds including all the retries. The value must be
greater than 0 if specified. The default value is None, meaning no timeout.
:rtype: List[~azure.servicebus.PeekedMessage]
Expand All @@ -486,6 +496,10 @@ def peek_messages(self, max_message_count=1, sequence_number=None):
"""
self._check_live()
sequence_number = kwargs.pop("sequence_number", 0)
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
if not sequence_number:
sequence_number = self._last_received_sequenced_number or 1
if int(max_message_count) < 1:
Expand All @@ -504,5 +518,6 @@ def peek_messages(self, max_message_count=1, sequence_number=None):
return self._mgmt_request_response_with_retry(
REQUEST_RESPONSE_PEEK_OPERATION,
message,
mgmt_handlers.peek_op
mgmt_handlers.peek_op,
timeout=timeout
)
Loading

0 comments on commit f06ccf0

Please sign in to comment.