Skip to content

Commit

Permalink
[EventHubs] Performance improvement (#7533)
Browse files Browse the repository at this point in the history
* performance improvement

* batch offset

* Review feedback

* Remove lazy parse in EventData

* Add annotation assertions
  • Loading branch information
yunhaoling authored Oct 2, 2019
1 parent 2ea6424 commit f26de12
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import uamqp # type: ignore
from uamqp import errors, types, utils # type: ignore
from uamqp import ReceiveClientAsync, Source # type: ignore
import uamqp

from azure.eventhub import EventData, EventPosition
from azure.eventhub.error import EventHubError, ConnectError, _error_handler
Expand Down Expand Up @@ -116,7 +117,7 @@ async def __anext__(self):
self._offset = EventPosition(event_data.offset, inclusive=False)
retried_times = 0
if self._track_last_enqueued_event_properties:
self._last_enqueued_event_properties = event_data._runtime_info # pylint:disable=protected-access
self._last_enqueued_event_properties = event_data._get_last_enqueued_event_properties() # pylint:disable=protected-access
return event_data
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception)
Expand Down Expand Up @@ -149,6 +150,8 @@ def _create_handler(self):
error_policy=self._retry_policy,
keep_alive_interval=self._keep_alive,
client_name=self._name,
receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete,
auto_complete=False,
properties=self._client._create_properties( # pylint:disable=protected-access
self._client._config.user_agent), # pylint:disable=protected-access
**desired_capabilities, # pylint:disable=protected-access
Expand Down Expand Up @@ -176,12 +179,14 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData._from_message(message) # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset)
data_batch.append(event_data)
event_data._trace_link_message() # pylint:disable=protected-access

if data_batch:
self._offset = EventPosition(data_batch[-1].offset)

if self._track_last_enqueued_event_properties and len(data_batch):
self._last_enqueued_event_properties = data_batch[-1]._runtime_info # pylint:disable=protected-access
self._last_enqueued_event_properties = data_batch[-1]._get_last_enqueued_event_properties() # pylint:disable=protected-access

return data_batch

Expand Down
63 changes: 31 additions & 32 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,15 @@ def __init__(self, body=None):
:type body: str, bytes or list
"""

self._annotations = {}
self._delivery_annotations = {}
self._app_properties = {}
self._msg_properties = MessageProperties()
self._runtime_info = {}
self._last_enqueued_event_properties = {}
if body and isinstance(body, list):
self.message = Message(body[0], properties=self._msg_properties)
self.message = Message(body[0])
for more in body[1:]:
self.message._body.append(more) # pylint: disable=protected-access
elif body is None:
raise ValueError("EventData cannot be None.")
else:
self.message = Message(body, properties=self._msg_properties)
self.message = Message(body)

def __str__(self):
dic = {
Expand All @@ -108,13 +104,12 @@ def _set_partition_key(self, value):
:param value: The partition key to set.
:type value: str or bytes
"""
annotations = dict(self._annotations)
annotations = dict(self.message.annotations)
annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] = value
header = MessageHeader()
header.durable = True
self.message.annotations = annotations
self.message.header = header
self._annotations = annotations

def _trace_message(self, parent_span=None):
"""Add tracing information to this message.
Expand Down Expand Up @@ -145,25 +140,30 @@ def _trace_link_message(self, parent_span=None):
if traceparent:
current_span.link(traceparent)

@staticmethod
def _from_message(message):
event_data = EventData(body='')
event_data.message = message
event_data._msg_properties = message.properties
event_data._annotations = message.annotations
event_data._app_properties = message.application_properties
event_data._delivery_annotations = message.delivery_annotations
if event_data._delivery_annotations:
event_data._runtime_info = {
def _get_last_enqueued_event_properties(self):
if self._last_enqueued_event_properties:
return self._last_enqueued_event_properties

if self.message.delivery_annotations:
self._last_enqueued_event_properties = {
"sequence_number":
event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None),
self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None),
"offset":
event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None),
self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None),
"enqueued_time":
event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None),
self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None),
"retrieval_time":
event_data._delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None)
self.message.delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None)
}
return self._last_enqueued_event_properties

return None

@classmethod
def _from_message(cls, message):
# pylint:disable=protected-access
event_data = cls(body='')
event_data.message = message
return event_data

@property
Expand All @@ -173,7 +173,7 @@ def sequence_number(self):
:rtype: int or long
"""
return self._annotations.get(EventData.PROP_SEQ_NUMBER, None)
return self.message.annotations.get(EventData.PROP_SEQ_NUMBER, None)

@property
def offset(self):
Expand All @@ -183,7 +183,7 @@ def offset(self):
:rtype: str
"""
try:
return self._annotations[EventData.PROP_OFFSET].decode('UTF-8')
return self.message.annotations[EventData.PROP_OFFSET].decode('UTF-8')
except (KeyError, AttributeError):
return None

Expand All @@ -194,7 +194,7 @@ def enqueued_time(self):
:rtype: datetime.datetime
"""
timestamp = self._annotations.get(EventData.PROP_TIMESTAMP, None)
timestamp = self.message.annotations.get(EventData.PROP_TIMESTAMP, None)
if timestamp:
return datetime.datetime.utcfromtimestamp(float(timestamp)/1000)
return None
Expand All @@ -207,9 +207,9 @@ def partition_key(self):
:rtype: bytes
"""
try:
return self._annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL]
return self.message.annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL]
except KeyError:
return self._annotations.get(EventData.PROP_PARTITION_KEY, None)
return self.message.annotations.get(EventData.PROP_PARTITION_KEY, None)

@property
def application_properties(self):
Expand All @@ -218,7 +218,7 @@ def application_properties(self):
:rtype: dict
"""
return self._app_properties
return self.message.application_properties

@application_properties.setter
def application_properties(self, value):
Expand All @@ -228,8 +228,7 @@ def application_properties(self, value):
:param value: The application properties for the EventData.
:type value: dict
"""
self._app_properties = value
properties = None if value is None else dict(self._app_properties)
properties = None if value is None else dict(value)
self.message.application_properties = properties

@property
Expand All @@ -239,7 +238,7 @@ def system_properties(self):
:rtype: dict
"""
return self._annotations
return self.message.annotations

@property
def body(self):
Expand Down
11 changes: 8 additions & 3 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import uamqp # type: ignore
from uamqp import types, errors, utils # type: ignore
from uamqp import ReceiveClient, Source # type: ignore
import uamqp

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.error import _error_handler
Expand Down Expand Up @@ -113,7 +114,7 @@ def __next__(self):
self._offset = EventPosition(event_data.offset, inclusive=False)
retried_times = 0
if self._track_last_enqueued_event_properties:
self._last_enqueued_event_properties = event_data._runtime_info # pylint:disable=protected-access
self._last_enqueued_event_properties = event_data._get_last_enqueued_event_properties() # pylint:disable=protected-access
return event_data
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
Expand Down Expand Up @@ -146,6 +147,8 @@ def _create_handler(self):
error_policy=self._retry_policy,
keep_alive_interval=self._keep_alive,
client_name=self._name,
receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete,
auto_complete=False,
properties=self._client._create_properties( # pylint:disable=protected-access
self._client._config.user_agent), # pylint:disable=protected-access
**desired_capabilities) # pylint:disable=protected-access
Expand All @@ -171,12 +174,14 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData._from_message(message) # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset)
data_batch.append(event_data)
event_data._trace_link_message() # pylint:disable=protected-access

if data_batch:
self._offset = EventPosition(data_batch[-1].offset)

if self._track_last_enqueued_event_properties and len(data_batch):
self._last_enqueued_event_properties = data_batch[-1]._runtime_info # pylint:disable=protected-access
self._last_enqueued_event_properties = data_batch[-1]._get_last_enqueued_event_properties() # pylint:disable=protected-access

return data_batch

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ async def test_receive_batch_async(connstr_senders):
received = await receiver.receive(max_batch_size=5, timeout=5)
assert len(received) == 5

for event in received:
assert event.system_properties
assert event.sequence_number is not None
assert event.offset
assert event.enqueued_time


async def pump(receiver, sleep=None):
messages = 0
Expand Down
6 changes: 6 additions & 0 deletions sdk/eventhub/azure-eventhubs/tests/test_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ def test_receive_batch(connstr_senders):
received = receiver.receive(max_batch_size=5, timeout=5)
assert len(received) == 5

for event in received:
assert event.system_properties
assert event.sequence_number is not None
assert event.offset
assert event.enqueued_time


@pytest.mark.liveTest
def test_receive_batch_with_app_prop_sync(connstr_senders):
Expand Down

0 comments on commit f26de12

Please sign in to comment.