Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Performance improvement #7533

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from uamqp import errors, types, utils # type: ignore
from uamqp import ReceiveClientAsync, Source # type: ignore
import uamqp

from azure.eventhub import EventData, EventPosition
from azure.eventhub.error import EventHubError, ConnectError, _error_handler
Expand Down Expand Up @@ -118,7 +119,7 @@ async def __anext__(self):
self._offset = EventPosition(event_data.offset, inclusive=False)
retried_times = 0
if self._track_last_enqueued_event_properties:
self._last_enqueued_event_properties = event_data._runtime_info # pylint:disable=protected-access
self._last_enqueued_event_properties = event_data._get_runtime_info() # pylint:disable=protected-access
return event_data
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception)
Expand Down Expand Up @@ -153,6 +154,8 @@ def _create_handler(self):
error_policy=self._retry_policy,
keep_alive_interval=self._keep_alive,
client_name=self._name,
receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete,
auto_complete=False,
properties=self._client._create_properties( # pylint:disable=protected-access
self._client._config.user_agent), # pylint:disable=protected-access
desired_capabilities=desired_capabilities, # pylint:disable=protected-access
Expand Down Expand Up @@ -199,12 +202,14 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData._from_message(message) # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset)
data_batch.append(event_data)
event_data._trace_link_message() # pylint:disable=protected-access

if data_batch:
self._offset = EventPosition(data_batch[-1].offset)

if self._track_last_enqueued_event_properties and len(data_batch):
self._last_enqueued_event_properties = data_batch[-1]._runtime_info # pylint:disable=protected-access
self._last_enqueued_event_properties = data_batch[-1]._get_runtime_info() # pylint:disable=protected-access

return data_batch

Expand Down
60 changes: 49 additions & 11 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(self, body=None, to_device=None):
self._app_properties = {}
self._msg_properties = MessageProperties()
self._runtime_info = {}
self._need_further_parse = False
if to_device:
self._msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device)
if body and isinstance(body, list):
Expand Down Expand Up @@ -152,27 +153,50 @@ def _trace_link_message(self, parent_span=None):
if traceparent:
current_span.link(traceparent)

def _get_runtime_info(self):
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
if self._runtime_info:
return self._runtime_info

if self.message.delivery_annotations:
self._runtime_info = {
"sequence_number":
self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None),
"offset":
self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None),
"enqueued_time":
self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None),
"retrieval_time":
self.message.delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None)
}
return self._runtime_info

return None

@staticmethod
def _from_message(message):
annatisch marked this conversation as resolved.
Show resolved Hide resolved
# pylint:disable=protected-access
event_data = EventData(body='')
event_data.message = message
event_data._msg_properties = message.properties
event_data._annotations = message.annotations
event_data._app_properties = message.application_properties
event_data._delivery_annotations = message.delivery_annotations
if event_data._delivery_annotations:
event_data._runtime_info = {
event_data._need_further_parse = True
return event_data

def _parse_message_properties(self):
self._msg_properties = self.message.properties
self._annotations = self.message.annotations
self._app_properties = self.message.application_properties
self._delivery_annotations = self.message.delivery_annotations
if self._delivery_annotations:
self._runtime_info = {
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
"sequence_number":
event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None),
self._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None),
"offset":
event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None),
self._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None),
"enqueued_time":
event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None),
self._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None),
"retrieval_time":
event_data._delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None)
self._delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None)
}
return event_data
self._need_further_parse = False

@property
def sequence_number(self):
Expand All @@ -181,6 +205,8 @@ def sequence_number(self):

:rtype: int or long
"""
if self._need_further_parse:
self._parse_message_properties()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uamqp already lazily loads properties. I guess another lazy loading won't improve much performance

Copy link
Contributor Author

@yunhaoling yunhaoling Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid I have to do the lazy assignment here.
because the parsing actually takes place when properties of message are assigned to events (the property method of message in uamqp will call parse function).
so we need make the assignment lazy too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can potentially only put the lazy loading into uamqp... we can discuss offline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lazy parsing removed

return self._annotations.get(EventData.PROP_SEQ_NUMBER, None)

@property
Expand All @@ -190,6 +216,8 @@ def offset(self):

:rtype: str
"""
if self._need_further_parse:
self._parse_message_properties()
try:
return self._annotations[EventData.PROP_OFFSET].decode('UTF-8')
except (KeyError, AttributeError):
Expand All @@ -202,6 +230,8 @@ def enqueued_time(self):

:rtype: datetime.datetime
"""
if self._need_further_parse:
self._parse_message_properties()
timestamp = self._annotations.get(EventData.PROP_TIMESTAMP, None)
if timestamp:
return datetime.datetime.utcfromtimestamp(float(timestamp)/1000)
Expand All @@ -215,6 +245,8 @@ def device_id(self):

:rtype: bytes
"""
if self._need_further_parse:
self._parse_message_properties()
return self._annotations.get(EventData.PROP_DEVICE_ID, None)

@property
Expand All @@ -224,6 +256,8 @@ def partition_key(self):

:rtype: bytes
"""
if self._need_further_parse:
self._parse_message_properties()
try:
return self._annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL]
except KeyError:
Expand All @@ -236,6 +270,8 @@ def application_properties(self):

:rtype: dict
"""
if self._need_further_parse:
self._parse_message_properties()
return self._app_properties

@application_properties.setter
Expand All @@ -257,6 +293,8 @@ def system_properties(self):

:rtype: dict
"""
if self._need_further_parse:
self._parse_message_properties()
return self._annotations

@property
Expand Down
11 changes: 8 additions & 3 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from uamqp import types, errors, utils # type: ignore
from uamqp import ReceiveClient, Source # type: ignore
import uamqp

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.error import _error_handler
Expand Down Expand Up @@ -114,7 +115,7 @@ def __next__(self):
self._offset = EventPosition(event_data.offset, inclusive=False)
retried_times = 0
if self._track_last_enqueued_event_properties:
self._last_enqueued_event_properties = event_data._runtime_info # pylint:disable=protected-access
self._last_enqueued_event_properties = event_data._get_runtime_info() # pylint:disable=protected-access
return event_data
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
Expand Down Expand Up @@ -149,6 +150,8 @@ def _create_handler(self):
error_policy=self._retry_policy,
keep_alive_interval=self._keep_alive,
client_name=self._name,
receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete,
auto_complete=False,
properties=self._client._create_properties( # pylint:disable=protected-access
self._client._config.user_agent), # pylint:disable=protected-access
desired_capabilities=desired_capabilities) # pylint:disable=protected-access
Expand Down Expand Up @@ -193,12 +196,14 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData._from_message(message) # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset)
data_batch.append(event_data)
event_data._trace_link_message() # pylint:disable=protected-access

if data_batch:
self._offset = EventPosition(data_batch[-1].offset)

if self._track_last_enqueued_event_properties and len(data_batch):
self._last_enqueued_event_properties = data_batch[-1]._runtime_info # pylint:disable=protected-access
self._last_enqueued_event_properties = data_batch[-1]._get_runtime_info() # pylint:disable=protected-access

return data_batch

Expand Down