From d9943666c3d01fa28dd4111e7bb6ec5ccfb3fef7 Mon Sep 17 00:00:00 2001 From: Antonio Carlos Date: Fri, 1 Mar 2024 22:24:21 -0300 Subject: [PATCH 1/3] Protects buffer append from race condition --- .../_buffered_producer/_buffered_producer.py | 26 +++++++++---------- .../_buffered_producer_async.py | 26 +++++++++---------- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index 9babfcb7d2bb..d8fca9a4bac9 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -105,24 +105,22 @@ def put_events(self, events, timeout_time=None): raise OperationTimeoutError( "Failed to enqueue events into buffer due to timeout." ) - try: - # add single event into current batch - self._cur_batch.add(events) - except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer - # if there are events in cur_batch, enqueue cur_batch to the buffer - with self._lock: + with self._lock: + try: + # add single event into current batch + self._cur_batch.add(events) + except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer + # if there are events in cur_batch, enqueue cur_batch to the buffer if self._cur_batch: self._buffered_queue.put(self._cur_batch) self._buffered_queue.put(events) - # create a new batch for incoming events - self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) - except ValueError: - # add single event exceeds the cur batch size, create new batch - with self._lock: + # create a new batch for incoming events + self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) + except ValueError: + # add single event exceeds the cur batch size, create new batch self._buffered_queue.put(self._cur_batch) - self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) - self._cur_batch.add(events) - with self._lock: + self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) + self._cur_batch.add(events) self._cur_buffered_len += new_events_len def failsafe_callback(self, callback): diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py index 0888aca876e5..d9891b530e54 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py @@ -105,24 +105,22 @@ async def put_events(self, events, timeout_time=None): raise OperationTimeoutError( "Failed to enqueue events into buffer due to timeout." ) - try: - # add single event into current batch - self._cur_batch.add(events) - except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer - # if there are events in cur_batch, enqueue cur_batch to the buffer - async with self._lock: + async with self._lock: + try: + # add single event into current batch + self._cur_batch.add(events) + except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer + # if there are events in cur_batch, enqueue cur_batch to the buffer if self._cur_batch: self._buffered_queue.put(self._cur_batch) self._buffered_queue.put(events) - # create a new batch for incoming events - self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) - except ValueError: - # add single event exceeds the cur batch size, create new batch - async with self._lock: + # create a new batch for incoming events + self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) + except ValueError: + # add single event exceeds the cur batch size, create new batch self._buffered_queue.put(self._cur_batch) - self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) - self._cur_batch.add(events) - async with self._lock: + self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) + self._cur_batch.add(events) self._cur_buffered_len += new_events_len def failsafe_callback(self, callback): From c6479b5011bbed99b9b0fcda318957045cde44e4 Mon Sep 17 00:00:00 2001 From: Antonio Carlos Date: Fri, 1 Mar 2024 22:29:46 -0300 Subject: [PATCH 2/3] Improves current batch flushing consistency --- .../azure/eventhub/_buffered_producer/_buffered_producer.py | 4 +--- .../aio/_buffered_producer/_buffered_producer_async.py | 3 --- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index d8fca9a4bac9..1064000e2159 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -144,6 +144,7 @@ def flush(self, timeout_time=None, raise_error=True): _LOGGER.info("Partition: %r started flushing.", self.partition_id) if self._cur_batch: # if there is batch, enqueue it to the buffer first self._buffered_queue.put(self._cur_batch) + self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) while self._buffered_queue.qsize() > 0: remaining_time = timeout_time - time.time() if timeout_time else None if (remaining_time and remaining_time > 0) or remaining_time is None: @@ -195,9 +196,6 @@ def flush(self, timeout_time=None, raise_error=True): break # after finishing flushing, reset cur batch and put it into the buffer self._last_send_time = time.time() - #reset buffered count - self._cur_buffered_len = 0 - self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) _LOGGER.info("Partition %r finished flushing.", self.partition_id) def check_max_wait_time_worker(self): diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py index d9891b530e54..4f8468367904 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py @@ -198,9 +198,6 @@ async def _flush(self, timeout_time=None, raise_error=True): break # after finishing flushing, reset cur batch and put it into the buffer self._last_send_time = time.time() - #reset curr_buffered - self._cur_buffered_len = 0 - self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) _LOGGER.info("Partition %r finished flushing.", self.partition_id) async def check_max_wait_time_worker(self): From 094223caff83b8f1890385900333f5eb25b40336 Mon Sep 17 00:00:00 2001 From: Antonio Carlos Date: Sat, 9 Mar 2024 17:54:17 -0300 Subject: [PATCH 3/3] Update azure-eventhub CHANGELOG.md --- sdk/eventhub/azure-eventhub/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/eventhub/azure-eventhub/CHANGELOG.md b/sdk/eventhub/azure-eventhub/CHANGELOG.md index eab898c3abea..8f0912164eb9 100644 --- a/sdk/eventhub/azure-eventhub/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Fixed a bug where using `EventHubProducerClient` in buffered mode could potentially drop a buffered message without actually sending it. ([#34712](https://github.com/Azure/azure-sdk-for-python/pull/34712)) + ### Other Changes - Updated network trace logging to replace `None` values in AMQP connection info with empty strings as per the OpenTelemetry specification.