From fc939b38e746280f104c1cd335a721708d6dd32f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Carlos=20Falc=C3=A3o=20Petri?= Date: Thu, 21 Mar 2024 18:21:19 -0300 Subject: [PATCH] [EventHub] Fix race condition when buffered mode is enabled (#34712) * Protects buffer append from race condition * Improves current batch flushing consistency * Update azure-eventhub CHANGELOG.md --- sdk/eventhub/azure-eventhub/CHANGELOG.md | 2 ++ .../_buffered_producer/_buffered_producer.py | 30 ++++++++----------- .../_buffered_producer_async.py | 29 ++++++++---------- 3 files changed, 27 insertions(+), 34 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/CHANGELOG.md b/sdk/eventhub/azure-eventhub/CHANGELOG.md index eab898c3abea..8f0912164eb9 100644 --- a/sdk/eventhub/azure-eventhub/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Fixed a bug where using `EventHubProducerClient` in buffered mode could potentially drop a buffered message without actually sending it. ([#34712](https://github.com/Azure/azure-sdk-for-python/pull/34712)) + ### Other Changes - Updated network trace logging to replace `None` values in AMQP connection info with empty strings as per the OpenTelemetry specification. diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py index 9babfcb7d2bb..1064000e2159 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py @@ -105,24 +105,22 @@ def put_events(self, events, timeout_time=None): raise OperationTimeoutError( "Failed to enqueue events into buffer due to timeout." ) - try: - # add single event into current batch - self._cur_batch.add(events) - except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer - # if there are events in cur_batch, enqueue cur_batch to the buffer - with self._lock: + with self._lock: + try: + # add single event into current batch + self._cur_batch.add(events) + except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer + # if there are events in cur_batch, enqueue cur_batch to the buffer if self._cur_batch: self._buffered_queue.put(self._cur_batch) self._buffered_queue.put(events) - # create a new batch for incoming events - self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) - except ValueError: - # add single event exceeds the cur batch size, create new batch - with self._lock: + # create a new batch for incoming events + self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) + except ValueError: + # add single event exceeds the cur batch size, create new batch self._buffered_queue.put(self._cur_batch) - self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) - self._cur_batch.add(events) - with self._lock: + self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) + self._cur_batch.add(events) self._cur_buffered_len += new_events_len def failsafe_callback(self, callback): @@ -146,6 +144,7 @@ def flush(self, timeout_time=None, raise_error=True): _LOGGER.info("Partition: %r started flushing.", self.partition_id) if self._cur_batch: # if there is batch, enqueue it to the buffer first self._buffered_queue.put(self._cur_batch) + self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) while self._buffered_queue.qsize() > 0: remaining_time = timeout_time - time.time() if timeout_time else None if (remaining_time and remaining_time > 0) or remaining_time is None: @@ -197,9 +196,6 @@ def flush(self, timeout_time=None, raise_error=True): break # after finishing flushing, reset cur batch and put it into the buffer self._last_send_time = time.time() - #reset buffered count - self._cur_buffered_len = 0 - self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) _LOGGER.info("Partition %r finished flushing.", self.partition_id) def check_max_wait_time_worker(self): diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py index 0888aca876e5..4f8468367904 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py @@ -105,24 +105,22 @@ async def put_events(self, events, timeout_time=None): raise OperationTimeoutError( "Failed to enqueue events into buffer due to timeout." ) - try: - # add single event into current batch - self._cur_batch.add(events) - except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer - # if there are events in cur_batch, enqueue cur_batch to the buffer - async with self._lock: + async with self._lock: + try: + # add single event into current batch + self._cur_batch.add(events) + except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer + # if there are events in cur_batch, enqueue cur_batch to the buffer if self._cur_batch: self._buffered_queue.put(self._cur_batch) self._buffered_queue.put(events) - # create a new batch for incoming events - self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) - except ValueError: - # add single event exceeds the cur batch size, create new batch - async with self._lock: + # create a new batch for incoming events + self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) + except ValueError: + # add single event exceeds the cur batch size, create new batch self._buffered_queue.put(self._cur_batch) - self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) - self._cur_batch.add(events) - async with self._lock: + self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) + self._cur_batch.add(events) self._cur_buffered_len += new_events_len def failsafe_callback(self, callback): @@ -200,9 +198,6 @@ async def _flush(self, timeout_time=None, raise_error=True): break # after finishing flushing, reset cur batch and put it into the buffer self._last_send_time = time.time() - #reset curr_buffered - self._cur_buffered_len = 0 - self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport) _LOGGER.info("Partition %r finished flushing.", self.partition_id) async def check_max_wait_time_worker(self):