diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py index 2ade0672b434..8576188f5c1b 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py @@ -129,7 +129,6 @@ def _populate_message_properties(self, message): def _enhanced_message_received(self, frame, message): # pylint: disable=protected-access - # self._handler._was_message_received = True if self._receive_context.is_set(): self._handler._received_messages.put((frame, message)) else: @@ -137,7 +136,6 @@ def _enhanced_message_received(self, frame, message): async def _enhanced_message_received_async(self, frame, message): # pylint: disable=protected-access - # self._handler._was_message_received = True if self._receive_context.is_set(): self._handler._received_messages.put((frame, message)) else: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_client_async.py index 6e8524648bc0..cf4218d8a40f 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_client_async.py @@ -137,7 +137,7 @@ class AMQPClientAsync(AMQPClientSync): """ def __init__(self, hostname, **kwargs): - self._lock_async = asyncio.Lock() + self._mgmt_link_lock_async = asyncio.Lock() super().__init__(hostname,**kwargs) @@ -364,7 +364,7 @@ async def mgmt_request_async(self, message, **kwargs): operation_type = kwargs.pop("operation_type", None) node = kwargs.pop("node", "$management") timeout = kwargs.pop('timeout', 0) - async with self._lock_async: + async with self._mgmt_link_lock_async: try: mgmt_link = self._mgmt_links[node] except KeyError: @@ -372,8 +372,8 @@ async def mgmt_request_async(self, message, **kwargs): self._mgmt_links[node] = mgmt_link await mgmt_link.open() - while not await mgmt_link.ready(): - await self._connection.listen(wait=False) + while not await mgmt_link.ready(): + await self._connection.listen(wait=False) operation_type = operation_type or b'empty' status, description, response = await mgmt_link.execute( diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/client.py index 526703ebf51a..ce462bf32fac 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/client.py @@ -164,10 +164,10 @@ def __init__(self, hostname, **kwargs): self._cbs_authenticator = None self._auth_timeout = kwargs.pop("auth_timeout", DEFAULT_AUTH_TIMEOUT) self._mgmt_links = {} + self._mgmt_link_lock = threading.Lock() self._retry_policy = kwargs.pop("retry_policy", RetryPolicy()) self._keep_alive_interval = int(kwargs.get("keep_alive_interval", 0)) self._keep_alive_thread = None - self._lock = threading.Lock() # Connection settings self._max_frame_size = kwargs.pop("max_frame_size", MAX_FRAME_SIZE_BYTES) @@ -436,7 +436,7 @@ def mgmt_request(self, message, **kwargs): operation_type = kwargs.pop("operation_type", None) node = kwargs.pop("node", "$management") timeout = kwargs.pop("timeout", 0) - with self._lock: + with self._mgmt_link_lock: try: mgmt_link = self._mgmt_links[node] except KeyError: @@ -444,8 +444,9 @@ def mgmt_request(self, message, **kwargs): self._mgmt_links[node] = mgmt_link mgmt_link.open() - while not mgmt_link.ready(): - self._connection.listen(wait=False) + while not mgmt_link.ready(): + self._connection.listen(wait=False) + operation_type = operation_type or b"empty" status, description, response = mgmt_link.execute( message, operation=operation, operation_type=operation_type, timeout=timeout @@ -792,7 +793,7 @@ def __init__(self, hostname, source, **kwargs): self._link_credit = kwargs.pop("link_credit", 300) self._timeout = kwargs.pop("timeout", 0) self._timeout_reached = False - self._last_activity_stamp = time.time() + self._last_activity_timestamp = time.time() self._running_iter = False super(ReceiveClient, self).__init__(hostname, **kwargs) @@ -834,7 +835,6 @@ def _client_run(self, **kwargs): self._link.flow() self._connection.listen(wait=self._socket_timeout, **kwargs) self._timeout_reached = False - except ValueError: _logger.info("Timeout reached, closing receiver.", extra=self._network_trace_params) self._shutdown = True @@ -949,7 +949,7 @@ def _message_generator(self): self._timeout_reached = False receiving = True message = None - self._last_activity_stamp = time.time() + self._last_activity_timestamp = time.time() try: while receiving and not self._timeout_reached: if not self._running_iter: @@ -964,7 +964,7 @@ def _message_generator(self): while not self._received_messages.empty(): message = self._received_messages.get() - self._last_activity_stamp = time.time() + self._last_activity_timestamp = time.time() self._received_messages.task_done() yield message diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index cf27016524e0..7b714a10d7e2 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -2248,10 +2248,10 @@ async def hack_iter_next_mock_error(self): await self._open() # when trying to receive the second message (execution_times is 1), raising LinkDetach error to mock 10 mins idle timeout if self.execution_times == 1: - from azure.servicebus._pyamqp.error import ErrorCondition, AMQPConnectionError + from azure.servicebus._pyamqp.error import ErrorCondition, AMQPLinkError self.execution_times += 1 self.error_raised = True - raise AMQPConnectionError(condition=ErrorCondition.LinkDetachForced) + raise AMQPLinkError(condition=ErrorCondition.LinkDetachForced) else: self.execution_times += 1 if not self._message_iter: diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 5976a7061545..0800fc3881e8 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -320,16 +320,16 @@ def sub_test_non_releasing_messages(): receiver = sb_client.get_queue_receiver(servicebus_queue.name) sender = sb_client.get_queue_sender(servicebus_queue.name) - # def _hack_disable_receive_context_message_received(self, message): - # # pylint: disable=protected-access - # # self._handler._was_message_received = True - # self._handler._received_messages.put(message) + def _hack_disable_receive_context_message_received(self, message): + # pylint: disable=protected-access + # self._handler._was_message_received = True + self._handler._received_messages.put(message) with sender, receiver: # send 5 msgs to queue first sender.send_messages([ServiceBusMessage('test') for _ in range(5)]) - # receiver._handler.on_message_received = types.MethodType( - # _hack_disable_receive_context_message_received, receiver) + receiver._handler._message_received = types.MethodType( + _hack_disable_receive_context_message_received, receiver) received_msgs = [] while len(received_msgs) < 5: # issue 10 link credits, client should consume 5 msgs from the service @@ -352,18 +352,18 @@ def sub_test_non_releasing_messages(): for msg in received_msgs: # queue ordering I think assert msg.delivery_count == 0 - # with pytest.raises(ServiceBusError): - receiver.complete_message(msg) + with pytest.raises(ServiceBusError): + receiver.complete_message(msg) # re-received message with delivery count increased - # target_msgs_count = 5 - # received_msgs = [] - # while len(received_msgs) < target_msgs_count: - # received_msgs.extend(receiver.receive_messages(max_message_count=5, max_wait_time=5)) - # assert len(received_msgs) == 5 - # for msg in received_msgs: - # assert msg.delivery_count > 0 - # receiver.complete_message(msg) + target_msgs_count = 5 + received_msgs = [] + while len(received_msgs) < target_msgs_count: + received_msgs.extend(receiver.receive_messages(max_message_count=5, max_wait_time=5)) + assert len(received_msgs) == 5 + for msg in received_msgs: + assert msg.delivery_count > 0 + receiver.complete_message(msg) sub_test_releasing_messages() sub_test_releasing_messages_iterator() @@ -2647,10 +2647,10 @@ def hack_iter_next_mock_error(self): self._open() # when trying to receive the second message (execution_times is 1), raising LinkDetach error to mock 10 mins idle timeout if self.execution_times == 1: - from azure.servicebus._pyamqp.error import ErrorCondition, AMQPConnectionError + from azure.servicebus._pyamqp.error import ErrorCondition, AMQPLinkError self.execution_times += 1 self.error_raised = True - raise AMQPConnectionError(condition=ErrorCondition.LinkDetachForced) + raise AMQPLinkError(condition=ErrorCondition.LinkDetachForced) else: self.execution_times += 1 if not self._message_iter: