-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[EventHubs] add amqp switch support (#25965)
This PR is for adding switch support to the `feature/eventhub/pyproto` changes including the uamqp switch from current `main`. fixes #21246 Addressing Anna's comments from uamqp switch PR (#25193) + main changes: - [x] Moving pyamqp logic out to the PyamqpTransport - [x] **Confirmed: The size of encoded pyamqp.Message is larger than uamqp.Message.** I thought otherwise b/c I was adding the header/property objects by default when building the outgoing uamqp message, even if all values inside those are None. I've fixed this. - [ ] Make BatchMessage transport agnostic: #25494 (comment) - Instead, updated EventDataBatch so that it takes an amqp_transport. If EventDataBatch is manually created and uses PyamqpTransport, inside `send_batch()`, if the producer client transport uses UamqpTransport, the BatchMessage corresponding to the client's amqp_transport will be built and sent. - [X] Add `message` property to `EventData`/`EventDataBatch`, which return `LegacyMessage`/`LegacyMessageBatch` from `_pyamqp` for backcompat. - [x] add `connection=None` parameter to `pyamqp.AMQPClient.open()` as per [this discussion](#25494 (comment)) - [x] Add an async SharedConnectionManager in pyamqp: #25494 (comment) Issue created [[here](#25875)] to address the below TODOs in a separate PR: - [x] add TODO in pyamqp that SenderClient should take msg_timeout: #25494 (comment) - [x] add TODO in pyamqp that ReceiveClient should take timeout: #25494 (comment) - [x] add both MAX_MESSAGE_LENGTH_BYTES and MAX_FRAME_SIZE_BYTES to pyamqp: #25494 (comment) TODO: - [ ] fix mypy/pylint issues - made partial progress. fix rest in separate PR for issue: #25936 - [x] investigate how to remove `message` property from public API. - mark as deprecated and log a deprecation warning if accessed
- Loading branch information
Showing
113 changed files
with
9,667 additions
and
3,265 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
13 changes: 13 additions & 0 deletions
13
sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
# -------------------------------------------------------------------------------------------- | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. See License.txt in the project root for license information. | ||
# -------------------------------------------------------------------------------------------- | ||
from ._buffered_producer import BufferedProducer | ||
from ._partition_resolver import PartitionResolver | ||
from ._buffered_producer_dispatcher import BufferedProducerDispatcher | ||
|
||
__all__ = [ | ||
"BufferedProducer", | ||
"PartitionResolver", | ||
"BufferedProducerDispatcher", | ||
] |
218 changes: 218 additions & 0 deletions
218
sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,218 @@ | ||
# -------------------------------------------------------------------------------------------- | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. See License.txt in the project root for license information. | ||
# -------------------------------------------------------------------------------------------- | ||
from __future__ import annotations | ||
import time | ||
import queue | ||
import logging | ||
from threading import RLock | ||
from concurrent.futures import ThreadPoolExecutor | ||
from typing import Optional, Callable, TYPE_CHECKING | ||
|
||
from .._producer import EventHubProducer | ||
from .._common import EventDataBatch | ||
from ..exceptions import OperationTimeoutError | ||
|
||
if TYPE_CHECKING: | ||
from .._transport._base import AmqpTransport | ||
from .._producer_client import SendEventTypes | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class BufferedProducer: | ||
# pylint: disable=too-many-instance-attributes | ||
def __init__( | ||
self, | ||
producer: EventHubProducer, | ||
partition_id: str, | ||
on_success: Callable[["SendEventTypes", Optional[str]], None], | ||
on_error: Callable[["SendEventTypes", Optional[str], Exception], None], | ||
max_message_size_on_link: int, | ||
executor: ThreadPoolExecutor, | ||
*, | ||
amqp_transport: AmqpTransport, | ||
max_buffer_length: int, | ||
max_wait_time: float = 1 | ||
): | ||
self._buffered_queue: queue.Queue = queue.Queue() | ||
self._max_buffer_len = max_buffer_length | ||
self._cur_buffered_len = 0 | ||
self._executor: ThreadPoolExecutor = executor | ||
self._producer: EventHubProducer = producer | ||
self._lock = RLock() | ||
self._max_wait_time = max_wait_time | ||
self._on_success = self.failsafe_callback(on_success) | ||
self._on_error = self.failsafe_callback(on_error) | ||
self._last_send_time = None | ||
self._running = False | ||
self._cur_batch: Optional[EventDataBatch] = None | ||
self._max_message_size_on_link = max_message_size_on_link | ||
self._check_max_wait_time_future = None | ||
self.partition_id = partition_id | ||
self._amqp_transport = amqp_transport | ||
|
||
def start(self): | ||
with self._lock: | ||
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) | ||
self._running = True | ||
if self._max_wait_time: | ||
self._last_send_time = time.time() | ||
self._check_max_wait_time_future = self._executor.submit( | ||
self.check_max_wait_time_worker | ||
) | ||
|
||
def stop(self, flush=True, timeout_time=None, raise_error=False): | ||
self._running = False | ||
if flush: | ||
with self._lock: | ||
self.flush(timeout_time=timeout_time, raise_error=raise_error) | ||
else: | ||
if self._cur_buffered_len: | ||
_LOGGER.warning( | ||
"Shutting down Partition %r. There are still %r events in the buffer which will be lost", | ||
self.partition_id, | ||
self._cur_buffered_len, | ||
) | ||
if self._check_max_wait_time_future: | ||
remain_timeout = timeout_time - time.time() if timeout_time else None | ||
try: | ||
self._check_max_wait_time_future.result(remain_timeout) | ||
except Exception as exc: # pylint: disable=broad-except | ||
_LOGGER.warning( | ||
"Partition %r stopped with error %r", self.partition_id, exc | ||
) | ||
self._producer.close() | ||
|
||
def put_events(self, events, timeout_time=None): | ||
# Put single event or EventDataBatch into the queue. | ||
# This method would raise OperationTimeout if the queue does not have enough space for the input and | ||
# flush cannot finish in timeout. | ||
try: | ||
new_events_len = len(events) | ||
except TypeError: | ||
new_events_len = 1 | ||
if self._max_buffer_len - self._cur_buffered_len < new_events_len: | ||
_LOGGER.info( | ||
"The buffer for partition %r is full. Attempting to flush before adding %r events.", | ||
self.partition_id, | ||
new_events_len, | ||
) | ||
# flush the buffer | ||
self.flush(timeout_time=timeout_time) | ||
if timeout_time and time.time() > timeout_time: | ||
raise OperationTimeoutError( | ||
"Failed to enqueue events into buffer due to timeout." | ||
) | ||
try: | ||
# add single event into current batch | ||
self._cur_batch.add(events) | ||
except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer | ||
# if there are events in cur_batch, enqueue cur_batch to the buffer | ||
with self._lock: | ||
if self._cur_batch: | ||
self._buffered_queue.put(self._cur_batch) | ||
self._buffered_queue.put(events) | ||
# create a new batch for incoming events | ||
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) | ||
except ValueError: | ||
# add single event exceeds the cur batch size, create new batch | ||
with self._lock: | ||
self._buffered_queue.put(self._cur_batch) | ||
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) | ||
self._cur_batch.add(events) | ||
with self._lock: | ||
self._cur_buffered_len += new_events_len | ||
|
||
def failsafe_callback(self, callback): | ||
def wrapper_callback(*args, **kwargs): | ||
try: | ||
callback(*args, **kwargs) | ||
except Exception as exc: # pylint: disable=broad-except | ||
_LOGGER.warning( | ||
"On partition %r, callback %r encountered exception %r", | ||
callback.__name__, | ||
exc, | ||
self.partition_id, | ||
) | ||
|
||
return wrapper_callback | ||
|
||
def flush(self, timeout_time=None, raise_error=True): | ||
# pylint: disable=protected-access | ||
# try flushing all the buffered batch within given time | ||
with self._lock: | ||
_LOGGER.info("Partition: %r started flushing.", self.partition_id) | ||
if self._cur_batch: # if there is batch, enqueue it to the buffer first | ||
self._buffered_queue.put(self._cur_batch) | ||
while self._buffered_queue.qsize() > 0: | ||
remaining_time = timeout_time - time.time() if timeout_time else None | ||
if (remaining_time and remaining_time > 0) or remaining_time is None: | ||
try: | ||
batch = self._buffered_queue.get(block=False) | ||
except queue.Empty: | ||
break | ||
self._buffered_queue.task_done() | ||
try: | ||
_LOGGER.info("Partition %r is sending.", self.partition_id) | ||
self._producer.send( | ||
batch, | ||
timeout=timeout_time - time.time() | ||
if timeout_time | ||
else None, | ||
) | ||
_LOGGER.info( | ||
"Partition %r sending %r events succeeded.", | ||
self.partition_id, | ||
len(batch), | ||
) | ||
self._on_success(batch._internal_events, self.partition_id) | ||
except Exception as exc: # pylint: disable=broad-except | ||
_LOGGER.info( | ||
"Partition %r sending %r events failed due to exception: %r ", | ||
self.partition_id, | ||
len(batch), | ||
exc, | ||
) | ||
self._on_error(batch._internal_events, self.partition_id, exc) | ||
finally: | ||
self._cur_buffered_len -= len(batch) | ||
else: | ||
_LOGGER.info( | ||
"Partition %r fails to flush due to timeout.", self.partition_id | ||
) | ||
if raise_error: | ||
raise OperationTimeoutError( | ||
"Failed to flush {!r} within {}".format( | ||
self.partition_id, timeout_time | ||
) | ||
) | ||
break | ||
# after finishing flushing, reset cur batch and put it into the buffer | ||
self._last_send_time = time.time() | ||
#reset buffered count | ||
self._cur_buffered_len = 0 | ||
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) | ||
_LOGGER.info("Partition %r finished flushing.", self.partition_id) | ||
|
||
def check_max_wait_time_worker(self): | ||
while self._running: | ||
if self._cur_buffered_len > 0: | ||
now_time = time.time() | ||
_LOGGER.info( | ||
"Partition %r worker is checking max_wait_time.", self.partition_id | ||
) | ||
# flush the partition if the producer is running beyond the waiting time | ||
# or the buffer is at max capacity | ||
if (now_time - self._last_send_time > self._max_wait_time) or ( | ||
self._cur_buffered_len >= self._max_buffer_len | ||
): | ||
# in the worker, not raising error for flush, users can not handle this | ||
with self._lock: | ||
self.flush(raise_error=False) | ||
time.sleep(min(self._max_wait_time, 5)) | ||
|
||
@property | ||
def buffered_event_count(self): | ||
return self._cur_buffered_len |
Oops, something went wrong.