Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Iterator Support #28558

Merged
Merged
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
3ee359e
iterator
l0lawrence Jan 31, 2023
05250c7
add todo
l0lawrence Jan 31, 2023
88b1120
remove logger
l0lawrence Jan 31, 2023
1f5b679
tests
l0lawrence Jan 31, 2023
e18a804
skip serialization and pyamp transport errors
l0lawrence Feb 1, 2023
658dbde
add in keep-alive for releasing messages
l0lawrence Feb 2, 2023
3b2cf98
try this for client.py
l0lawrence Feb 3, 2023
aff32f6
trying to fix iterator vs normal receiving timeout for releasing mess…
l0lawrence Feb 5, 2023
22e91fa
remove part of test failing for sync release -- doesnt work on pyamqp
l0lawrence Feb 6, 2023
b5aa858
make message yeilding a while loop
l0lawrence Feb 6, 2023
2aabbbc
copying changes to async
l0lawrence Feb 6, 2023
9dcdbff
fixing async release test
l0lawrence Feb 6, 2023
9749542
message_received was set incorrectly/ diff than uamqp
l0lawrence Feb 6, 2023
243bdfc
fix async
l0lawrence Feb 6, 2023
87ee205
fixing closing order logic
l0lawrence Feb 6, 2023
476fa61
receive_contxt, fix test
l0lawrence Feb 7, 2023
79f5ca6
another receive_context
l0lawrence Feb 7, 2023
b22e664
asyncio.lock
l0lawrence Feb 7, 2023
a57e553
receiver_context
l0lawrence Feb 7, 2023
692d123
try to ignore sync for now
l0lawrence Feb 7, 2023
575e3ed
async with lock
l0lawrence Feb 7, 2023
1100434
receive_context
l0lawrence Feb 7, 2023
0616a77
remove print statements
l0lawrence Feb 8, 2023
17a5114
unskip sync
l0lawrence Feb 8, 2023
87eebfa
fix pylint and mypy
l0lawrence Feb 8, 2023
80ab91a
skip tests except for failing one
l0lawrence Feb 8, 2023
d63fb83
mark not mock
l0lawrence Feb 8, 2023
89beb14
socket read timeout set to 1 but was .2 on sync
l0lawrence Feb 9, 2023
33e1e5d
run all tests with new timeout
l0lawrence Feb 9, 2023
ce794c5
pr comments
l0lawrence Feb 9, 2023
43f110e
pr comments
l0lawrence Feb 9, 2023
5fb3423
typing add back
l0lawrence Feb 9, 2023
5e12e61
remove todo
l0lawrence Feb 9, 2023
e300e67
time to live - ttl
l0lawrence Feb 10, 2023
4b2cfb5
async init
l0lawrence Feb 10, 2023
461ac89
async init
l0lawrence Feb 10, 2023
f8c96f2
pr comments
l0lawrence Feb 21, 2023
adaaa9c
Revert "pr comments"
l0lawrence Feb 21, 2023
8e6de66
pr comments - lock rename
l0lawrence Feb 21, 2023
378c6e5
pr comments - remove timeout setting for uamqp
l0lawrence Feb 21, 2023
c319e7b
tests
l0lawrence Feb 21, 2023
7749507
set link credit in connection listen on keep alive
l0lawrence Feb 22, 2023
cb3132b
link async
l0lawrence Feb 22, 2023
87193d3
change pop to get
l0lawrence Feb 22, 2023
570f443
try flow before connection
l0lawrence Feb 23, 2023
e2b4528
test
l0lawrence Feb 23, 2023
d9cae81
link credit not bein kept bc of flow in client_run
l0lawrence Feb 23, 2023
b426a2b
if link credit is 0, reset it
l0lawrence Feb 24, 2023
dacadbc
add wait_time to tests
l0lawrence Feb 24, 2023
5f66d18
time override test
l0lawrence Feb 24, 2023
873fdc9
Connection to Link Error
l0lawrence Feb 24, 2023
cba0a95
sleep fix
l0lawrence Feb 24, 2023
63ab697
link_credit
l0lawrence Feb 24, 2023
126bf66
missing _ in test
l0lawrence Feb 24, 2023
921e0a7
remove boolean flag
l0lawrence Feb 25, 2023
04f6e04
need actibvity timestamp in yeild message
l0lawrence Feb 25, 2023
f0246db
missing "_"
l0lawrence Feb 26, 2023
97ed577
stamp ->timestamp
l0lawrence Feb 27, 2023
9c43db8
dont fix EH here
l0lawrence Feb 27, 2023
4ae5a80
formatting pylint
l0lawrence Feb 27, 2023
736356d
sock timeout async
l0lawrence Feb 27, 2023
d8fe2de
whitespace
l0lawrence Feb 27, 2023
a73f8c1
remove __aiter__
l0lawrence Feb 27, 2023
ee4f6e0
need iter
l0lawrence Feb 27, 2023
0f43c7e
remove todo
l0lawrence Feb 27, 2023
23a2935
todo
l0lawrence Feb 27, 2023
bb94d66
whitespace
l0lawrence Feb 27, 2023
6fdfd83
iter_context
l0lawrence Feb 27, 2023
0ae02bb
remove self
l0lawrence Feb 27, 2023
8b6eb3e
tests - remove #pytest.skip
l0lawrence Mar 3, 2023
3e2337a
add timeouts to constants
l0lawrence Mar 3, 2023
7e06b91
merge together if statements
l0lawrence Mar 3, 2023
312cae9
timeout
l0lawrence Mar 5, 2023
bd4ded5
missing if
l0lawrence Mar 6, 2023
822c47d
pylint
l0lawrence Mar 6, 2023
6121ffa
pylint
l0lawrence Mar 6, 2023
2065203
whitespace
l0lawrence Mar 6, 2023
d565dc5
todo
l0lawrence Mar 7, 2023
a762e21
if statement :)
l0lawrence Mar 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# license information.
# -------------------------------------------------------------------------
import uuid

import time
from .._pyamqp.endpoints import Source
from .message import ServiceBusReceivedMessage
from ..exceptions import _ServiceBusErrorPolicy, MessageAlreadySettled
Expand Down Expand Up @@ -129,8 +129,16 @@ def _populate_message_properties(self, message):

def _enhanced_message_received(self, frame, message):
# pylint: disable=protected-access
self._handler._was_message_received = True
self._handler._last_activity_timestamp = time.time()
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
if self._receive_context.is_set():
self._handler._received_messages.put((frame, message))
else:
self._handler.settle_messages(frame[1], 'released')

l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
async def _enhanced_message_received_async(self, frame, message):
# pylint: disable=protected-access
self._handler._last_activity_timestamp = time.time()
if self._receive_context.is_set():
self._handler._received_messages.put((frame, message))
else:
await self._handler.settle_messages_async(frame[1], 'released')
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ class AMQPClientAsync(AMQPClientSync):
Default is None in which case `certifi.where()` will be used.
:paramtype connection_verify: str
"""

def __init__(self, hostname, **kwargs):
self._mgmt_link_lock_async = asyncio.Lock()
super().__init__(hostname,**kwargs)


async def _keep_alive_async(self):
start_time = time.time()
try:
Expand All @@ -147,7 +153,8 @@ async def _keep_alive_async(self):
self.__class__.__name__,
extra=self._network_trace_params
)
await asyncio.shield(self._connection.work_async())
await asyncio.shield(self._connection.listen(wait=self._socket_timeout,
batch=self._link.current_link_credit))
start_time = current_time
await asyncio.sleep(1)
except Exception as e: # pylint: disable=broad-except
Expand Down Expand Up @@ -263,10 +270,9 @@ async def open_async(self, connection=None):
self._network_trace_params["amqpConnection"] = self._connection._container_id
self._network_trace_params["amqpSession"] = self._session.name
self._shutdown = False
# TODO: Looks like this is broken - should re-enable later and test
# correct empty frame behaviour
# if self._keep_alive_interval:
# self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async())

if self._keep_alive_interval:
self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the empty frame behaviour - have you actually seen empty frames in the amqp trace? I don't know if it's ever been tested....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When running a keep receiver alive check, I am not seeing any empty frames coming in across the wire or going out

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably file an issue to investigate that separately.
I definitely observed the passing of empty frames using uamqp - so the services (can't recall if it was EH or SB...) do support it. Just not convinced the pyamqp has it correctly implemented just yet :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have seen empty frames come across in EH when the service wants to let the client know there aren't any messages for it to send. I would think the keep alive thread would ignore it completely and let the thread receiving it deal with it.


async def close_async(self):
"""Close the client asynchronously. This includes closing the Session
Expand All @@ -277,9 +283,6 @@ async def close_async(self):
self._shutdown = True
if not self._session:
return # already closed.
if self._keep_alive_thread:
await self._keep_alive_thread
self._keep_alive_thread = None
await self._close_link_async()
if self._cbs_authenticator:
await self._cbs_authenticator.close()
Expand All @@ -289,6 +292,9 @@ async def close_async(self):
if not self._external_connection:
await self._connection.close()
self._connection = None
if self._keep_alive_thread:
await self._keep_alive_thread
self._keep_alive_thread = None
self._network_trace_params["amqpConnection"] = None
self._network_trace_params["amqpSession"] = None

Expand Down Expand Up @@ -360,15 +366,16 @@ async def mgmt_request_async(self, message, **kwargs):
operation_type = kwargs.pop("operation_type", None)
node = kwargs.pop("node", "$management")
timeout = kwargs.pop('timeout', 0)
try:
mgmt_link = self._mgmt_links[node]
except KeyError:
mgmt_link = ManagementOperation(self._session, endpoint=node, **kwargs)
self._mgmt_links[node] = mgmt_link
await mgmt_link.open()
async with self._mgmt_link_lock_async:
try:
mgmt_link = self._mgmt_links[node]
except KeyError:
mgmt_link = ManagementOperation(self._session, endpoint=node, **kwargs)
self._mgmt_links[node] = mgmt_link
await mgmt_link.open()

while not await mgmt_link.ready():
await self._connection.listen(wait=False)
while not await mgmt_link.ready():
await self._connection.listen(wait=False)

operation_type = operation_type or b'empty'
status, description, response = await mgmt_link.execute(
Expand Down Expand Up @@ -724,7 +731,8 @@ async def _client_run_async(self, **kwargs):
:rtype: bool
"""
try:
await self._link.flow()
if self._link.current_link_credit == 0:
await self._link.flow()
await self._connection.listen(wait=self._socket_timeout, **kwargs)
except ValueError:
_logger.info("Timeout reached, closing receiver.", extra=self._network_trace_params)
Expand All @@ -741,6 +749,7 @@ async def _message_received_async(self, frame, message):
:param message: Received message.
:type message: ~pyamqp.message.Message
"""
self._last_activity_timestamp = time.time()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_receive on a message means there was activity -- this is used to measure timeout

if self._message_received_callback:
await self._message_received_callback(message)
if not self._streaming_receive:
Expand Down Expand Up @@ -829,6 +838,47 @@ async def receive_message_batch_async(self, **kwargs):
**kwargs
)

async def receive_messages_iter_async(self, timeout=None, on_message_received=None):
"""Receive messages by generator. Messages returned in the generator have already been
accepted - if you wish to add logic to accept or reject messages based on custom
criteria, pass in a callback.

:param on_message_received: A callback to process messages as they arrive from the
service. It takes a single argument, a ~pyamqp.message.Message object.
:type on_message_received: callable[~pyamqp.message.Message]
"""
self._message_received_callback = on_message_received
return self._message_generator_async(timeout=timeout)

async def _message_generator_async(self, timeout=None):
"""Iterate over processed messages in the receive queue.

:rtype: generator[~pyamqp.message.Message]
"""
receiving = True
message = None
self._last_activity_timestamp = time.time()
self._timeout_reached = False
self._timeout = timeout if timeout else self._timeout
try:
while receiving and not self._timeout_reached:
if self._timeout > 0:
if time.time() - self._last_activity_timestamp >= self._timeout:
self._timeout_reached = True

if not self._timeout_reached:
receiving = await self.do_work_async()

while not self._received_messages.empty():
message = self._received_messages.get()
self._last_activity_timestamp = time.time()
self._received_messages.task_done()
yield message

finally:
if self._shutdown:
await self.close_async()

@overload
async def settle_messages_async(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ async def _read_frame(self, wait: Union[bool, int, float] = True, **kwargs) -> b
"""
timeout: Optional[Union[int, float]] = None
if wait is False:
timeout = 1 # TODO: What should this default be?
timeout = .2
annatisch marked this conversation as resolved.
Show resolved Hide resolved
elif wait is True:
timeout = None
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def _init_socket(self, socket_settings):
# For uamqp exception parity. Remove later when resolving issue #27128.
exc.filename = self.sslopts
raise exc
self.sock.settimeout(1) # set socket back to non-blocking mode
self.sock.settimeout(.2) # set socket back to non-blocking mode

def _get_tcp_socket_defaults(self, sock): # pylint: disable=no-self-use
tcp_opts = {}
Expand Down
101 changes: 88 additions & 13 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# pylint: disable=too-many-lines
# TODO: Check types of kwargs (issue exists for this)
import logging
import threading
import queue
import time
import uuid
Expand Down Expand Up @@ -163,8 +164,9 @@ def __init__(self, hostname, **kwargs):
self._cbs_authenticator = None
self._auth_timeout = kwargs.pop("auth_timeout", DEFAULT_AUTH_TIMEOUT)
self._mgmt_links = {}
self._mgmt_link_lock = threading.Lock()
self._retry_policy = kwargs.pop("retry_policy", RetryPolicy())
self._keep_alive_interval = int(kwargs.get("keep_alive_interval") or 0)
self._keep_alive_interval = int(kwargs.get("keep_alive_interval", 0))
self._keep_alive_thread = None

# Connection settings
Expand Down Expand Up @@ -217,6 +219,20 @@ def __exit__(self, *args):
"""Close and destroy Client on exiting a context manager."""
self.close()

def _keep_alive(self):
start_time = time.time()
try:
while self._connection and not self._shutdown:
current_time = time.time()
elapsed_time = current_time - start_time
if elapsed_time >= self._keep_alive_interval:
_logger.debug("Keeping %r connection alive.", self.__class__.__name__)
self._connection.listen(wait=self._socket_timeout, batch=self._link.current_link_credit)
start_time = current_time
time.sleep(1)
except Exception as e: # pylint: disable=broad-except
_logger.debug("Connection keep-alive for %r failed: %r.", self.__class__.__name__, e)

def _client_ready(self): # pylint: disable=no-self-use
"""Determine whether the client is ready to start sending and/or
receiving messages. To be ready, the connection must be open and
Expand Down Expand Up @@ -306,6 +322,10 @@ def open(self, connection=None):
outgoing_window=self._outgoing_window,
)
self._session.begin()
if self._keep_alive_interval:
self._keep_alive_thread = threading.Thread(target=self._keep_alive)
self._keep_alive_thread.daemon = True
self._keep_alive_thread.start()
if self._auth.auth_type == AUTH_TYPE_CBS:
self._cbs_authenticator = CBSAuthenticator(
session=self._session, auth=self._auth, auth_timeout=self._auth_timeout
Expand Down Expand Up @@ -339,6 +359,12 @@ def close(self):
if not self._external_connection:
self._connection.close()
self._connection = None
if self._keep_alive_thread:
try:
self._keep_alive_thread.join()
except RuntimeError: # Probably thread failed to start in .open()
logging.debug("Keep alive thread failed to join.", exc_info=True)
self._keep_alive_thread = None
self._network_trace_params["amqpConnection"] = None
self._network_trace_params["amqpSession"] = None

Expand Down Expand Up @@ -409,16 +435,16 @@ def mgmt_request(self, message, **kwargs):
operation_type = kwargs.pop("operation_type", None)
node = kwargs.pop("node", "$management")
timeout = kwargs.pop("timeout", 0)
try:
mgmt_link = self._mgmt_links[node]
except KeyError:
mgmt_link = ManagementOperation(self._session, endpoint=node, **kwargs)
self._mgmt_links[node] = mgmt_link
mgmt_link.open()

while not mgmt_link.ready():
self._connection.listen(wait=False)

with self._mgmt_link_lock:
try:
mgmt_link = self._mgmt_links[node]
except KeyError:
mgmt_link = ManagementOperation(self._session, endpoint=node, **kwargs)
self._mgmt_links[node] = mgmt_link
mgmt_link.open()

while not mgmt_link.ready():
self._connection.listen(wait=False)
operation_type = operation_type or b"empty"
status, description, response = mgmt_link.execute(
message, operation=operation, operation_type=operation_type, timeout=timeout
Expand Down Expand Up @@ -659,7 +685,7 @@ def send_message(self, message, **kwargs):
self._do_retryable_operation(self._send_message_impl, message=message, **kwargs)


class ReceiveClient(AMQPClient):
class ReceiveClient(AMQPClient): # pylint:disable=too-many-instance-attributes
"""
An AMQP client for receiving messages.
:param source: The source AMQP service endpoint. This can either be the URI as
Expand Down Expand Up @@ -762,6 +788,12 @@ def __init__(self, hostname, source, **kwargs):
self._max_message_size = kwargs.pop("max_message_size", MAX_FRAME_SIZE_BYTES)
self._link_properties = kwargs.pop("link_properties", None)
self._link_credit = kwargs.pop("link_credit", 300)

# Iterator
self._timeout = kwargs.pop("timeout", 0)
self._timeout_reached = False
self._last_activity_timestamp = time.time()

super(ReceiveClient, self).__init__(hostname, **kwargs)

def _client_ready(self):
Expand Down Expand Up @@ -799,7 +831,8 @@ def _client_run(self, **kwargs):
:rtype: bool
"""
try:
self._link.flow()
if self._link.current_link_credit == 0:
self._link.flow()
self._connection.listen(wait=self._socket_timeout, **kwargs)
except ValueError:
_logger.info("Timeout reached, closing receiver.", extra=self._network_trace_params)
Expand All @@ -816,6 +849,7 @@ def _message_received(self, frame, message):
:param message: Received message.
:type message: ~pyamqp.message.Message
"""
self._last_activity_timestamp = time.time()
if self._message_received_callback:
self._message_received_callback(message)
if not self._streaming_receive:
Expand Down Expand Up @@ -895,6 +929,47 @@ def receive_message_batch(self, **kwargs):
"""
return self._do_retryable_operation(self._receive_message_batch_impl, **kwargs)

def receive_messages_iter(self, timeout=None, on_message_received=None):
"""Receive messages by generator. Messages returned in the generator have already been
accepted - if you wish to add logic to accept or reject messages based on custom
criteria, pass in a callback.

:param on_message_received: A callback to process messages as they arrive from the
service. It takes a single argument, a ~pyamqp.message.Message object.
:type on_message_received: callable[~pyamqp.message.Message]
"""
self._message_received_callback = on_message_received
return self._message_generator(timeout=timeout)

def _message_generator(self, timeout=None):
"""Iterate over processed messages in the receive queue.

:rtype: generator[~pyamqp.message.Message]
"""
self._timeout_reached = False
receiving = True
message = None
self._last_activity_timestamp = time.time()
self._timeout = timeout if timeout else self._timeout
try:
while receiving and not self._timeout_reached:
if self._timeout > 0:
if time.time() - self._last_activity_timestamp >= self._timeout:
self._timeout_reached = True

if not self._timeout_reached:
receiving = self.do_work()

while not self._received_messages.empty():
message = self._received_messages.get()
self._last_activity_timestamp = time.time()
self._received_messages.task_done()
yield message

finally:
if self._shutdown:
self.close()

@overload
def settle_messages(
self,
Expand Down
Loading