Skip to content

Commit

Permalink
pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
l0lawrence committed Feb 21, 2023
1 parent 461ac89 commit f8c96f2
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,13 @@ def _populate_message_properties(self, message):

def _enhanced_message_received(self, frame, message):
# pylint: disable=protected-access
# self._handler._was_message_received = True
if self._receive_context.is_set():
self._handler._received_messages.put((frame, message))
else:
self._handler.settle_messages(frame[1], 'released')

async def _enhanced_message_received_async(self, frame, message):
# pylint: disable=protected-access
# self._handler._was_message_received = True
if self._receive_context.is_set():
self._handler._received_messages.put((frame, message))
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class AMQPClientAsync(AMQPClientSync):
"""

def __init__(self, hostname, **kwargs):
self._lock_async = asyncio.Lock()
self._mgmt_link_lock_async = asyncio.Lock()
super().__init__(hostname,**kwargs)


Expand Down Expand Up @@ -364,16 +364,16 @@ async def mgmt_request_async(self, message, **kwargs):
operation_type = kwargs.pop("operation_type", None)
node = kwargs.pop("node", "$management")
timeout = kwargs.pop('timeout', 0)
async with self._lock_async:
async with self._mgmt_link_lock_async:
try:
mgmt_link = self._mgmt_links[node]
except KeyError:
mgmt_link = ManagementOperation(self._session, endpoint=node, **kwargs)
self._mgmt_links[node] = mgmt_link
await mgmt_link.open()

while not await mgmt_link.ready():
await self._connection.listen(wait=False)
while not await mgmt_link.ready():
await self._connection.listen(wait=False)

operation_type = operation_type or b'empty'
status, description, response = await mgmt_link.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ def __init__(self, hostname, **kwargs):
self._cbs_authenticator = None
self._auth_timeout = kwargs.pop("auth_timeout", DEFAULT_AUTH_TIMEOUT)
self._mgmt_links = {}
self._mgmt_link_lock = threading.Lock()
self._retry_policy = kwargs.pop("retry_policy", RetryPolicy())
self._keep_alive_interval = int(kwargs.get("keep_alive_interval", 0))
self._keep_alive_thread = None
self._lock = threading.Lock()

# Connection settings
self._max_frame_size = kwargs.pop("max_frame_size", MAX_FRAME_SIZE_BYTES)
Expand Down Expand Up @@ -436,16 +436,17 @@ def mgmt_request(self, message, **kwargs):
operation_type = kwargs.pop("operation_type", None)
node = kwargs.pop("node", "$management")
timeout = kwargs.pop("timeout", 0)
with self._lock:
with self._mgmt_link_lock:
try:
mgmt_link = self._mgmt_links[node]
except KeyError:
mgmt_link = ManagementOperation(self._session, endpoint=node, **kwargs)
self._mgmt_links[node] = mgmt_link
mgmt_link.open()

while not mgmt_link.ready():
self._connection.listen(wait=False)
while not mgmt_link.ready():
self._connection.listen(wait=False)

operation_type = operation_type or b"empty"
status, description, response = mgmt_link.execute(
message, operation=operation, operation_type=operation_type, timeout=timeout
Expand Down Expand Up @@ -792,7 +793,7 @@ def __init__(self, hostname, source, **kwargs):
self._link_credit = kwargs.pop("link_credit", 300)
self._timeout = kwargs.pop("timeout", 0)
self._timeout_reached = False
self._last_activity_stamp = time.time()
self._last_activity_timestamp = time.time()
self._running_iter = False
super(ReceiveClient, self).__init__(hostname, **kwargs)

Expand Down Expand Up @@ -834,7 +835,6 @@ def _client_run(self, **kwargs):
self._link.flow()
self._connection.listen(wait=self._socket_timeout, **kwargs)
self._timeout_reached = False

except ValueError:
_logger.info("Timeout reached, closing receiver.", extra=self._network_trace_params)
self._shutdown = True
Expand Down Expand Up @@ -949,7 +949,7 @@ def _message_generator(self):
self._timeout_reached = False
receiving = True
message = None
self._last_activity_stamp = time.time()
self._last_activity_timestamp = time.time()
try:
while receiving and not self._timeout_reached:
if not self._running_iter:
Expand All @@ -964,7 +964,7 @@ def _message_generator(self):

while not self._received_messages.empty():
message = self._received_messages.get()
self._last_activity_stamp = time.time()
self._last_activity_timestamp = time.time()
self._received_messages.task_done()
yield message

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2248,10 +2248,10 @@ async def hack_iter_next_mock_error(self):
await self._open()
# when trying to receive the second message (execution_times is 1), raising LinkDetach error to mock 10 mins idle timeout
if self.execution_times == 1:
from azure.servicebus._pyamqp.error import ErrorCondition, AMQPConnectionError
from azure.servicebus._pyamqp.error import ErrorCondition, AMQPLinkError
self.execution_times += 1
self.error_raised = True
raise AMQPConnectionError(condition=ErrorCondition.LinkDetachForced)
raise AMQPLinkError(condition=ErrorCondition.LinkDetachForced)
else:
self.execution_times += 1
if not self._message_iter:
Expand Down
36 changes: 18 additions & 18 deletions sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,16 +320,16 @@ def sub_test_non_releasing_messages():
receiver = sb_client.get_queue_receiver(servicebus_queue.name)
sender = sb_client.get_queue_sender(servicebus_queue.name)

# def _hack_disable_receive_context_message_received(self, message):
# # pylint: disable=protected-access
# # self._handler._was_message_received = True
# self._handler._received_messages.put(message)
def _hack_disable_receive_context_message_received(self, message):
# pylint: disable=protected-access
# self._handler._was_message_received = True
self._handler._received_messages.put(message)

with sender, receiver:
# send 5 msgs to queue first
sender.send_messages([ServiceBusMessage('test') for _ in range(5)])
# receiver._handler.on_message_received = types.MethodType(
# _hack_disable_receive_context_message_received, receiver)
receiver._handler._message_received = types.MethodType(
_hack_disable_receive_context_message_received, receiver)
received_msgs = []
while len(received_msgs) < 5:
# issue 10 link credits, client should consume 5 msgs from the service
Expand All @@ -352,18 +352,18 @@ def sub_test_non_releasing_messages():
for msg in received_msgs:
# queue ordering I think
assert msg.delivery_count == 0
# with pytest.raises(ServiceBusError):
receiver.complete_message(msg)
with pytest.raises(ServiceBusError):
receiver.complete_message(msg)

# re-received message with delivery count increased
# target_msgs_count = 5
# received_msgs = []
# while len(received_msgs) < target_msgs_count:
# received_msgs.extend(receiver.receive_messages(max_message_count=5, max_wait_time=5))
# assert len(received_msgs) == 5
# for msg in received_msgs:
# assert msg.delivery_count > 0
# receiver.complete_message(msg)
target_msgs_count = 5
received_msgs = []
while len(received_msgs) < target_msgs_count:
received_msgs.extend(receiver.receive_messages(max_message_count=5, max_wait_time=5))
assert len(received_msgs) == 5
for msg in received_msgs:
assert msg.delivery_count > 0
receiver.complete_message(msg)

sub_test_releasing_messages()
sub_test_releasing_messages_iterator()
Expand Down Expand Up @@ -2647,10 +2647,10 @@ def hack_iter_next_mock_error(self):
self._open()
# when trying to receive the second message (execution_times is 1), raising LinkDetach error to mock 10 mins idle timeout
if self.execution_times == 1:
from azure.servicebus._pyamqp.error import ErrorCondition, AMQPConnectionError
from azure.servicebus._pyamqp.error import ErrorCondition, AMQPLinkError
self.execution_times += 1
self.error_raised = True
raise AMQPConnectionError(condition=ErrorCondition.LinkDetachForced)
raise AMQPLinkError(condition=ErrorCondition.LinkDetachForced)
else:
self.execution_times += 1
if not self._message_iter:
Expand Down

0 comments on commit f8c96f2

Please sign in to comment.