From d9d21501946744c5668acd4dae3dd421c1b83019 Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Tue, 15 Feb 2022 14:03:31 -0800 Subject: [PATCH 1/8] stop spawning too much coroutines --- .../eventhub/_pyamqp/aio/_client_async.py | 26 ++++++++++++++++++- .../eventhub/_pyamqp/aio/_connection_async.py | 24 +++++++++++------ .../azure/eventhub/_pyamqp/client.py | 2 ++ 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py index e1b88b192690..c00eed990c08 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py @@ -115,6 +115,8 @@ class AMQPClientAsync(AMQPClientSync): Default is 'UTF-8' :type encoding: str """ + def __init__(self, hostname, auth=None, **kwargs): + super(AMQPClientAsync, self).__init__(hostname=hostname, auth=auth, **kwargs) async def __aenter__(self): """Run Client in an async context manager.""" @@ -136,7 +138,7 @@ async def _client_ready_async(self): # pylint: disable=no-self-use async def _client_run_async(self, **kwargs): """Perform a single Connection iteration.""" - await self._connection.listen(wait=self._socket_timeout) + await self._connection.listen(wait=self._socket_timeout, **kwargs) async def _close_link_async(self, **kwargs): if self._link and not self._link._is_closed: @@ -176,6 +178,23 @@ async def _do_retryable_operation_async(self, operation, *args, **kwargs): absolute_timeout -= (end_time - start_time) raise retry_settings['history'][-1] + async def _keep_alive_async(self): + interval = 10 if self._keep_alive is True else self._keep_alive + start_time = time.time() + try: + while self._connection and not self._shutdown: + current_time = time.time() + elapsed_time = (current_time - start_time) + if elapsed_time >= interval: + _logger.info("Keeping %r connection alive. %r", + self.__class__.__name__, + self._connection._container_id) + await asyncio.shield(self._connection._get_remote_timeout(current_time)) + start_time = current_time + await asyncio.sleep(1) + except Exception as e: # pylint: disable=broad-except + _logger.info("Connection keep-alive for %r failed: %r.", self.__class__.__name__, e) + async def open_async(self): """Asynchronously open the client. The client can create a new Connection or an existing Connection can be passed in. This existing Connection @@ -217,6 +236,8 @@ async def open_async(self): auth_timeout=self._auth_timeout ) await self._cbs_authenticator.open() + if self._keep_alive: + self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async()) self._shutdown = False async def close_async(self): @@ -228,6 +249,9 @@ async def close_async(self): self._shutdown = True if not self._session: return # already closed. + if self._keep_alive_thread: + await self._keep_alive_thread + self._keep_alive_thread = None await self._close_link_async(close=True) if self._cbs_authenticator: await self._cbs_authenticator.close() diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py index ffa8d271d0bb..845305868d39 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py @@ -126,7 +126,9 @@ async def _set_state(self, new_state): self.state = new_state _LOGGER.info("Connection '%s' state changed: %r -> %r", self._container_id, previous_state, new_state) - await asyncio.gather(*[session._on_connection_state_change() for session in self.outgoing_endpoints.values()]) + for session in self.outgoing_endpoints.values(): + await session._on_connection_state_change() + #await asyncio.gather(*[session._on_connection_state_change() for session in self.outgoing_endpoints.values()]) async def _connect(self): try: @@ -162,7 +164,8 @@ def _can_read(self): return self.state not in (ConnectionState.CLOSE_RCVD, ConnectionState.END) async def _read_frame(self, **kwargs): - if self._can_read(): + #if self._can_read(): + if self.state not in (ConnectionState.CLOSE_RCVD, ConnectionState.END): return await self.transport.receive_frame(**kwargs) _LOGGER.warning("Cannot read frame in current state: %r", self.state) @@ -205,11 +208,11 @@ def _get_next_outgoing_channel(self): async def _outgoing_empty(self): if self.network_trace: - _LOGGER.info("<- empty()", extra=self.network_trace_params) + _LOGGER.info("-> empty()", extra=self.network_trace_params) try: if self._can_write(): await self.transport.write(EMPTY_FRAME) - self._last_frame_sent_time = time.time() + self.last_frame_sent_time = time.time() except (OSError, IOError, SSLError, socket.error) as exc: self._error = AMQPConnectionError( ErrorCondition.SocketError, @@ -451,11 +454,16 @@ async def listen(self, wait=False, batch=1, **kwargs): ) return try: - tasks = [asyncio.ensure_future(self._listen_one_frame(**kwargs)) for _ in range(batch)] - await asyncio.gather(*tasks) + for _ in range(batch): + await self._listen_one_frame(**kwargs) except ValueError: - for task in tasks: - task.cancel() + pass + # try: + # tasks = [asyncio.ensure_future(self._listen_one_frame(**kwargs)) for _ in range(batch)] + # await asyncio.gather(*tasks) + # except ValueError: + # for task in tasks: + # task.cancel() except (OSError, IOError, SSLError, socket.error) as exc: self._error = AMQPConnectionError( ErrorCondition.SocketError, diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py index 09d3303a2698..51f2b41a75a2 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py @@ -137,6 +137,8 @@ def __init__(self, hostname, auth=None, **kwargs): self._auth_timeout = kwargs.pop("auth_timeout", DEFAULT_AUTH_TIMEOUT) self._mgmt_links = {} self._retry_policy = kwargs.pop("retry_policy", RetryPolicy()) + self._keep_alive = kwargs.pop("keep_alive", False) + self._keep_alive_thread = None # Connection settings self._max_frame_size = kwargs.pop('max_frame_size', None) or MAX_FRAME_SIZE_BYTES From 153592d9803ea196268aed98140b8742a4d7685f Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Tue, 15 Feb 2022 17:14:51 -0800 Subject: [PATCH 2/8] improve send --- .../azure/eventhub/_pyamqp/aio/_connection_async.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py index 845305868d39..d763d45cf00e 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py @@ -455,7 +455,8 @@ async def listen(self, wait=False, batch=1, **kwargs): return try: for _ in range(batch): - await self._listen_one_frame(**kwargs) + await asyncio.ensure_future(self._listen_one_frame(**kwargs)) + # await self._listen_one_frame(**kwargs) except ValueError: pass # try: From 9a8031daead9fb42b4a340414a008e0bcad47885 Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Wed, 16 Feb 2022 13:58:22 -0800 Subject: [PATCH 3/8] async recv perf improvement --- .../eventhub/_pyamqp/aio/_transport_async.py | 8 +- .../azure/eventhub/aio/_consumer_async.py | 111 ++++++++++-------- .../asynctests/test_reconnect_async.py | 12 +- 3 files changed, 69 insertions(+), 62 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py index 61b7b72b3c55..acbdd8af8e76 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py @@ -276,13 +276,7 @@ async def _read(self, toread, initial=False, buffer=None, try: while toread: try: - # TODO: await self.reader.readexactly would not return until it has received something which - # is problematic in the case timeout is required while no frame coming in. - # asyncio.wait_for is used here for timeout control - # set socket timeout does not work, not triggering socket error maybe should be a different config? - # also we could consider using a low level socket instead of high level reader/writer - # https://docs.python.org/3/library/asyncio-eventloop.html - view[nbytes:nbytes + toread] = await asyncio.wait_for(self.reader.readexactly(toread), timeout=1) + view[nbytes:nbytes + toread] = await self.reader.readexactly(toread) nbytes = toread except asyncio.IncompleteReadError as exc: pbytes = len(exc.partial) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py index f38064221fc7..f3462555f1bd 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py @@ -125,6 +125,9 @@ def __init__(self, client: "EventHubConsumerClient", source: str, **kwargs) -> N ) self._message_buffer = deque() # type: Deque[Message] self._last_received_event = None # type: Optional[EventData] + self._message_buffer_lock = asyncio.Lock() + self._last_callback_called_time = None + self._callback_task_run = None def _create_handler(self, auth: "JWTTokenAuthAsync") -> None: source = Source(self._source, filters={}) @@ -162,7 +165,8 @@ async def _open_with_retry(self) -> None: await self._do_retryable_operation(self._open, operation_need_param=False) async def _message_received(self, message: Message) -> None: - self._message_buffer.append(message) + async with self._message_buffer_lock: + self._message_buffer.append(message) def _next_message_in_buffer(self): # pylint:disable=protected-access @@ -171,54 +175,67 @@ def _next_message_in_buffer(self): self._last_received_event = event_data return event_data - async def receive(self, batch=False, max_batch_size=300, max_wait_time=None) -> None: + async def _callback_coroutine(self, batch, max_batch_size, max_wait_time): + while self._callback_task_run: + try: + async with self._message_buffer_lock: + messages = [ + self._message_buffer.popleft() for _ in range(min(max_batch_size, len(self._message_buffer))) + ] + events = [EventData._from_message(message) for message in messages] + now_time = time.time() + if len(events) > 0: + await self._on_event_received(events if batch else events[0]) + self._last_callback_called_time = now_time + else: + if max_wait_time and (now_time - self._last_callback_called_time) > max_wait_time: + # no events received, and need to callback + await self._on_event_received([] if batch else None) + self._last_callback_called_time = now_time + # backoff a bit to avoid throttling CPU when no events are coming + await asyncio.sleep(0.05) + except asyncio.CancelledError: + raise + + async def _receive_coroutine(self): max_retries = ( self._client._config.max_retries # pylint:disable=protected-access ) - has_not_fetched_once = True # ensure one trip when max_wait_time is very small - deadline = time.time() + (max_wait_time or 0) # max_wait_time can be None - while len(self._message_buffer) < max_batch_size and \ - (time.time() < deadline or has_not_fetched_once): - retried_times = 0 - has_not_fetched_once = False - while retried_times <= max_retries: - try: - await self._open() - await cast(ReceiveClientAsync, self._handler).do_work_async(batch=self._prefetch) - break - except asyncio.CancelledError: # pylint: disable=try-except-raise - raise - except Exception as exception: # pylint: disable=broad-except - if ( + retried_times = 0 + while retried_times <= max_retries: + try: + await self._open() + await cast(ReceiveClientAsync, self._handler).do_work_async(batch=self._prefetch) + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except Exception as exception: # pylint: disable=broad-except + if ( isinstance(exception, error.AMQPLinkError) and exception.condition == error.ErrorCondition.LinkStolen # pylint: disable=no-member - ): - raise await self._handle_exception(exception) - if not self.running: # exit by close - return - if self._last_received_event: - self._offset = self._last_received_event.offset - last_exception = await self._handle_exception(exception) - retried_times += 1 - if retried_times > max_retries: - _LOGGER.info( - "%r operation has exhausted retry. Last exception: %r.", - self._name, - last_exception, - ) - raise last_exception - - if self._message_buffer: - while self._message_buffer: - if batch: - events_for_callback = [] # type: List[EventData] - for _ in range(min(max_batch_size, len(self._message_buffer))): - events_for_callback.append(self._next_message_in_buffer()) - await self._on_event_received(events_for_callback) - else: - await self._on_event_received(self._next_message_in_buffer()) - elif max_wait_time: - if batch: - await self._on_event_received([]) - else: - await self._on_event_received(None) + ): + raise await self._handle_exception(exception) + if not self.running: # exit by close + return + if self._last_received_event: + self._offset = self._last_received_event.offset + last_exception = await self._handle_exception(exception) + retried_times += 1 + if retried_times > max_retries: + _LOGGER.info( + "%r operation has exhausted retry. Last exception: %r.", + self._name, + last_exception, + ) + raise last_exception + + async def receive(self, batch=False, max_batch_size=300, max_wait_time=None) -> None: + self._callback_task_run = True + self._last_callback_called_time = time.time() + callback_task = asyncio.ensure_future(self._callback_coroutine(batch, max_batch_size, max_wait_time)) + receive_task = asyncio.ensure_future(self._receive_coroutine()) + + try: + await receive_task + finally: + self._callback_task_run = False + await callback_task diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_reconnect_async.py b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_reconnect_async.py index 5c04c46f2879..38a1690817cc 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_reconnect_async.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_reconnect_async.py @@ -127,13 +127,9 @@ async def on_event_received(event): await consumer._handler.do_work_async() assert consumer._handler._connection.state == constants.ConnectionState.END - duration = 10 - now_time = time.time() - end_time = now_time + duration - - while now_time < end_time: - await consumer.receive() - await asyncio.sleep(0.01) - now_time = time.time() + try: + await asyncio.wait_for(consumer.receive(), timeout=10) + except asyncio.TimeoutError: + pass assert on_event_received.event.body_as_str() == "Event" From 77e882050faa340382288797b2c23a59bccead7c Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Thu, 17 Feb 2022 22:58:32 -0800 Subject: [PATCH 4/8] async perf improve --- .../eventhub/_pyamqp/aio/_client_async.py | 26 +------------------ .../eventhub/_pyamqp/aio/_connection_async.py | 11 +------- .../azure/eventhub/_pyamqp/client.py | 2 -- 3 files changed, 2 insertions(+), 37 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py index c00eed990c08..e1b88b192690 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py @@ -115,8 +115,6 @@ class AMQPClientAsync(AMQPClientSync): Default is 'UTF-8' :type encoding: str """ - def __init__(self, hostname, auth=None, **kwargs): - super(AMQPClientAsync, self).__init__(hostname=hostname, auth=auth, **kwargs) async def __aenter__(self): """Run Client in an async context manager.""" @@ -138,7 +136,7 @@ async def _client_ready_async(self): # pylint: disable=no-self-use async def _client_run_async(self, **kwargs): """Perform a single Connection iteration.""" - await self._connection.listen(wait=self._socket_timeout, **kwargs) + await self._connection.listen(wait=self._socket_timeout) async def _close_link_async(self, **kwargs): if self._link and not self._link._is_closed: @@ -178,23 +176,6 @@ async def _do_retryable_operation_async(self, operation, *args, **kwargs): absolute_timeout -= (end_time - start_time) raise retry_settings['history'][-1] - async def _keep_alive_async(self): - interval = 10 if self._keep_alive is True else self._keep_alive - start_time = time.time() - try: - while self._connection and not self._shutdown: - current_time = time.time() - elapsed_time = (current_time - start_time) - if elapsed_time >= interval: - _logger.info("Keeping %r connection alive. %r", - self.__class__.__name__, - self._connection._container_id) - await asyncio.shield(self._connection._get_remote_timeout(current_time)) - start_time = current_time - await asyncio.sleep(1) - except Exception as e: # pylint: disable=broad-except - _logger.info("Connection keep-alive for %r failed: %r.", self.__class__.__name__, e) - async def open_async(self): """Asynchronously open the client. The client can create a new Connection or an existing Connection can be passed in. This existing Connection @@ -236,8 +217,6 @@ async def open_async(self): auth_timeout=self._auth_timeout ) await self._cbs_authenticator.open() - if self._keep_alive: - self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async()) self._shutdown = False async def close_async(self): @@ -249,9 +228,6 @@ async def close_async(self): self._shutdown = True if not self._session: return # already closed. - if self._keep_alive_thread: - await self._keep_alive_thread - self._keep_alive_thread = None await self._close_link_async(close=True) if self._cbs_authenticator: await self._cbs_authenticator.close() diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py index d763d45cf00e..3c15f225b522 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py @@ -128,7 +128,6 @@ async def _set_state(self, new_state): for session in self.outgoing_endpoints.values(): await session._on_connection_state_change() - #await asyncio.gather(*[session._on_connection_state_change() for session in self.outgoing_endpoints.values()]) async def _connect(self): try: @@ -164,8 +163,7 @@ def _can_read(self): return self.state not in (ConnectionState.CLOSE_RCVD, ConnectionState.END) async def _read_frame(self, **kwargs): - #if self._can_read(): - if self.state not in (ConnectionState.CLOSE_RCVD, ConnectionState.END): + if self._can_read(): return await self.transport.receive_frame(**kwargs) _LOGGER.warning("Cannot read frame in current state: %r", self.state) @@ -456,15 +454,8 @@ async def listen(self, wait=False, batch=1, **kwargs): try: for _ in range(batch): await asyncio.ensure_future(self._listen_one_frame(**kwargs)) - # await self._listen_one_frame(**kwargs) except ValueError: pass - # try: - # tasks = [asyncio.ensure_future(self._listen_one_frame(**kwargs)) for _ in range(batch)] - # await asyncio.gather(*tasks) - # except ValueError: - # for task in tasks: - # task.cancel() except (OSError, IOError, SSLError, socket.error) as exc: self._error = AMQPConnectionError( ErrorCondition.SocketError, diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py index 51f2b41a75a2..09d3303a2698 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py @@ -137,8 +137,6 @@ def __init__(self, hostname, auth=None, **kwargs): self._auth_timeout = kwargs.pop("auth_timeout", DEFAULT_AUTH_TIMEOUT) self._mgmt_links = {} self._retry_policy = kwargs.pop("retry_policy", RetryPolicy()) - self._keep_alive = kwargs.pop("keep_alive", False) - self._keep_alive_thread = None # Connection settings self._max_frame_size = kwargs.pop('max_frame_size', None) or MAX_FRAME_SIZE_BYTES From 32d1cd4357ef439a926b51574ff81ca24398f26c Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Wed, 2 Mar 2022 17:59:15 -0800 Subject: [PATCH 5/8] update version --- sdk/eventhub/azure-eventhub/azure/eventhub/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py index 3d19c8d056c4..13c5ec5b035e 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py @@ -3,4 +3,4 @@ # Licensed under the MIT License. # ------------------------------------ -VERSION = "5.8.0b3" +VERSION = "5.8.0a3" From 5e264f79f931a9bd848e2135247d48861f30a86c Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Thu, 3 Mar 2022 16:05:44 -0800 Subject: [PATCH 6/8] align with sync imple --- .../azure/eventhub/_pyamqp/aio/_connection_async.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py index 3c15f225b522..fa5193f5e28f 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py @@ -422,8 +422,7 @@ async def _wait_for_response(self, wait, end_state): async def _listen_one_frame(self, **kwargs): new_frame = await self._read_frame(**kwargs) - if await self._process_incoming_frame(*new_frame): - raise ValueError("Stop") # Stop listening + return await self._process_incoming_frame(*new_frame) async def listen(self, wait=False, batch=1, **kwargs): try: @@ -451,11 +450,9 @@ async def listen(self, wait=False, batch=1, **kwargs): description="Connection was already closed." ) return - try: - for _ in range(batch): - await asyncio.ensure_future(self._listen_one_frame(**kwargs)) - except ValueError: - pass + for _ in range(batch): + if await asyncio.ensure_future(self._listen_one_frame(**kwargs)): + break except (OSError, IOError, SSLError, socket.error) as exc: self._error = AMQPConnectionError( ErrorCondition.SocketError, From 820d6506035e52bc20c2cd772a184b9f9073dee5 Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Thu, 3 Mar 2022 16:11:10 -0800 Subject: [PATCH 7/8] update method name --- .../azure-eventhub/azure/eventhub/aio/_consumer_async.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py index f3462555f1bd..2157c78ac6c5 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py @@ -175,7 +175,7 @@ def _next_message_in_buffer(self): self._last_received_event = event_data return event_data - async def _callback_coroutine(self, batch, max_batch_size, max_wait_time): + async def _callback_task(self, batch, max_batch_size, max_wait_time): while self._callback_task_run: try: async with self._message_buffer_lock: @@ -197,7 +197,7 @@ async def _callback_coroutine(self, batch, max_batch_size, max_wait_time): except asyncio.CancelledError: raise - async def _receive_coroutine(self): + async def _receive_task(self): max_retries = ( self._client._config.max_retries # pylint:disable=protected-access ) @@ -231,8 +231,8 @@ async def _receive_coroutine(self): async def receive(self, batch=False, max_batch_size=300, max_wait_time=None) -> None: self._callback_task_run = True self._last_callback_called_time = time.time() - callback_task = asyncio.ensure_future(self._callback_coroutine(batch, max_batch_size, max_wait_time)) - receive_task = asyncio.ensure_future(self._receive_coroutine()) + callback_task = asyncio.ensure_future(self._callback_task(batch, max_batch_size, max_wait_time)) + receive_task = asyncio.ensure_future(self._receive_task()) try: await receive_task From 0374f0bf169b831fe5913e1dc63a6d1e1b50c1d6 Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Thu, 3 Mar 2022 16:31:43 -0800 Subject: [PATCH 8/8] remove redundant except catch --- .../azure/eventhub/aio/_consumer_async.py | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py index 2157c78ac6c5..d5be74195636 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py @@ -177,25 +177,22 @@ def _next_message_in_buffer(self): async def _callback_task(self, batch, max_batch_size, max_wait_time): while self._callback_task_run: - try: - async with self._message_buffer_lock: - messages = [ - self._message_buffer.popleft() for _ in range(min(max_batch_size, len(self._message_buffer))) - ] - events = [EventData._from_message(message) for message in messages] - now_time = time.time() - if len(events) > 0: - await self._on_event_received(events if batch else events[0]) + async with self._message_buffer_lock: + messages = [ + self._message_buffer.popleft() for _ in range(min(max_batch_size, len(self._message_buffer))) + ] + events = [EventData._from_message(message) for message in messages] + now_time = time.time() + if len(events) > 0: + await self._on_event_received(events if batch else events[0]) + self._last_callback_called_time = now_time + else: + if max_wait_time and (now_time - self._last_callback_called_time) > max_wait_time: + # no events received, and need to callback + await self._on_event_received([] if batch else None) self._last_callback_called_time = now_time - else: - if max_wait_time and (now_time - self._last_callback_called_time) > max_wait_time: - # no events received, and need to callback - await self._on_event_received([] if batch else None) - self._last_callback_called_time = now_time - # backoff a bit to avoid throttling CPU when no events are coming - await asyncio.sleep(0.05) - except asyncio.CancelledError: - raise + # backoff a bit to avoid throttling CPU when no events are coming + await asyncio.sleep(0.05) async def _receive_task(self): max_retries = (