Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventhubs track2 python main issues #5575

Merged
merged 52 commits into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c3b7e99
Move to under sdk
May 18, 2019
0e88a67
Remove policies
May 19, 2019
f348cf2
Remove debugging files
May 19, 2019
be129cf
Rename Offset to EventPosition
May 19, 2019
0b257ec
make tests a namespace package
May 19, 2019
74d398e
Revised test receive for new code
May 19, 2019
1856bfa
Revised test send for track two
May 19, 2019
9d6403a
Update async code from sync
May 20, 2019
e1dff6b
Revise async receive and send live test for track2
May 20, 2019
9a671c3
Use uamqp 1.2
May 20, 2019
1ab3e1d
Resolve code review feedback
May 20, 2019
3932041
add queue_message to async sender
May 20, 2019
36423f4
send_batch receives both list and iterator
May 21, 2019
d186db7
Merge branch 'eventhubs_sdk' of github.com:Azure/azure-sdk-for-python…
May 21, 2019
c680b6a
Update after adp review
May 23, 2019
805b836
send accepts EventData, list, iteratable
May 24, 2019
2140eec
Event Hub Track 2 (#5)
yunhaoling May 24, 2019
273368f
change epoch to exclusive_receiver_priority
May 24, 2019
e675fc5
Merge branch 'master' of github.com:YijunXieMS/azure-sdk-for-python
May 24, 2019
cec2fdb
fix small problem
May 24, 2019
fa40e94
remove uamqp dependency
May 25, 2019
37938c3
Eventhub track2 (#6)
yunhaoling May 27, 2019
5b7b456
Changes from cross-lang
May 28, 2019
1262a2c
Change debug to network_tracing
May 28, 2019
f8b717e
Sync Client Constructor
May 29, 2019
0e53a5c
auto_reconnect True and keep_alive None
May 29, 2019
fdd3d44
consumer_group $default
May 29, 2019
50e0163
hide open()
May 29, 2019
094ae4e
partition -> partition_id
May 29, 2019
1c3df2f
credentials -> credential in init
May 29, 2019
9863a60
set running=true after opened
May 29, 2019
25848c4
Eventhub track2 - Update livetest (#7)
yunhaoling May 30, 2019
6d8623d
Merge branch 'master' of github.com:YijunXieMS/azure-sdk-for-python
May 30, 2019
d26c967
Add eh error classes
May 30, 2019
309dff1
EventHubError extends AzureError
May 30, 2019
d5ed5cc
Fix EventPosition default value issue
May 30, 2019
ae0ce5f
change $default to $Default
May 30, 2019
e3f9281
Handle TokenAuthError
May 30, 2019
55e5520
wait for ready in _reconnect
May 30, 2019
f7e572d
fix get_partition_ids issue
May 31, 2019
2cb8e30
Fix reconnect issue
May 31, 2019
3ded0ad
small fix
May 31, 2019
d415474
fix async live test
May 31, 2019
5e7d5f7
Eventhub track2 Live test update (#8)
yunhaoling May 31, 2019
56af6f3
debug->network_tracing
May 31, 2019
6a33049
Negative test fix
May 31, 2019
3ec4429
Remove partition_key, send with batching_label
May 31, 2019
343f391
Fix review problems
Jun 1, 2019
c2c2764
Fix a log issue
Jun 2, 2019
4085a42
fix get_partition_properties bug
Jun 2, 2019
1a42901
add client properties live test
Jun 2, 2019
8afabf6
Revised setup.py for track 2
Jun 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,29 @@

__version__ = "1.3.1"

from azure.eventhub.common import EventData, EventHubError, EventPosition
from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, AuthenticationError
from azure.eventhub.client import EventHubClient
from azure.eventhub.sender import Sender
from azure.eventhub.receiver import Receiver
from uamqp.constants import MessageSendResult
from uamqp.constants import TransportType
from .constants import MessageSendResult
from .constants import TransportType
from .common import FIRST_AVAILABLE, NEW_EVENTS_ONLY, SharedKeyCredentials, SASTokenCredentials

__all__ = [
"__version__",
"EventData",
"EventHubError",
"ConnectError",
"EventDataError",
"AuthenticationError",
"EventPosition",
"EventHubClient",
"Sender",
"Receiver",
"MessageSendResult",
"TransportType",
"FIRST_AVAILABLE", "NEW_EVENTS_ONLY",
"SharedKeyCredentials",
"SASTokenCredentials",
]

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
AMQPClientAsync,
)

from azure.eventhub.common import parse_sas_token
from azure.eventhub.common import parse_sas_token, SharedKeyCredentials, SASTokenCredentials
from azure.eventhub import (
EventHubError)
from ..client_abstract import EventHubClientAbstract
Expand Down Expand Up @@ -55,17 +55,19 @@ def _create_auth(self, username=None, password=None):
http_proxy = self.config.http_proxy
transport_type = self.config.transport_type
auth_timeout = self.config.auth_timeout
if self.aad_credential and self.sas_token:
raise ValueError("Can't have both sas_token and aad_credential")

elif self.aad_credential:
get_jwt_token = functools.partial(self.aad_credential.get_token, ['https://eventhubs.azure.net//.default'])
# TODO: should use async aad_credential.get_token. Check with Charles for async identity api
return authentication.JWTTokenAsync(self.auth_uri, self.auth_uri,
get_jwt_token, http_proxy=http_proxy,
transport_type=transport_type)
elif self.sas_token:
token = self.sas_token() if callable(self.sas_token) else self.sas_token
if isinstance(self.credential, SharedKeyCredentials):
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
return authentication.SASLPlain(
self.host, username, password, http_proxy=http_proxy, transport_type=transport_type)
return authentication.SASTokenAsync.from_shared_access_key(
self.auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy,
transport_type=transport_type)

elif isinstance(self.credential, SASTokenCredentials):
token = self.credential.get_sas_token()
try:
expiry = int(parse_sas_token(token)['se'])
except (KeyError, TypeError, IndexError):
Expand All @@ -77,15 +79,14 @@ def _create_auth(self, username=None, password=None):
http_proxy=http_proxy,
transport_type=transport_type)

username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
return authentication.SASLPlain(
self.address.hostname, username, password, http_proxy=http_proxy, transport_type=transport_type)
return authentication.SASTokenAsync.from_shared_access_key(
self.auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy, transport_type=transport_type)
else:
get_jwt_token = functools.partial(self.credential.get_token, ['https://eventhubs.azure.net//.default'])
return authentication.JWTTokenAsync(self.auth_uri, self.auth_uri,
get_jwt_token, http_proxy=http_proxy,
transport_type=transport_type)


async def get_eventhub_information(self):
async def get_properties(self):
"""
Get details on the specified EventHub async.

Expand All @@ -108,27 +109,76 @@ async def get_eventhub_information(self):
eh_info = response.get_data()
output = {}
if eh_info:
output['name'] = eh_info[b'name'].decode('utf-8')
output['type'] = eh_info[b'type'].decode('utf-8')
output['created_at'] = datetime.datetime.fromtimestamp(float(eh_info[b'created_at'])/1000)
output['partition_count'] = eh_info[b'partition_count']
output['path'] = eh_info[b'name'].decode('utf-8')
output['created_at'] = datetime.datetime.utcfromtimestamp(float(eh_info[b'created_at'])/1000)
output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']]
return output
finally:
await mgmt_client.close_async()

async def get_partition_ids(self):
YijunXieMS marked this conversation as resolved.
Show resolved Hide resolved
return (await self.get_properties())['partition_ids']

async def get_partition_properties(self, partition):
"""
Get information on the specified partition async.
Keys in the details dictionary include:

-'name'
-'type'
-'partition'
-'begin_sequence_number'
-'last_enqueued_sequence_number'
-'last_enqueued_offset'
-'last_enqueued_time_utc'
-'is_partition_empty'

:param partition: The target partition id.
:type partition: str
:rtype: dict
"""
alt_creds = {
"username": self._auth_config.get("iot_username"),
"password": self._auth_config.get("iot_password")}
try:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.debug)
await mgmt_client.open_async()
mgmt_msg = Message(application_properties={'name': self.eh_name,
'partition': partition})
response = await mgmt_client.mgmt_request_async(
mgmt_msg,
constants.READ_OPERATION,
op_type=b'com.microsoft:partition',
status_code_field=b'status-code',
description_fields=b'status-description')
partition_info = response.get_data()
output = {}
if partition_info:
output['event_hub_path'] = partition_info[b'name'].decode('utf-8')
output['id'] = partition_info[b'partition'].decode('utf-8')
output['begin_sequence_number'] = partition_info[b'begin_sequence_number']
output['last_enqueued_sequence_number'] = partition_info[b'last_enqueued_sequence_number']
output['last_enqueued_offset'] = partition_info[b'last_enqueued_offset'].decode('utf-8')
output['last_enqueued_time_utc'] = datetime.datetime.utcfromtimestamp(
float(partition_info[b'last_enqueued_time_utc'] / 1000))
output['is_empty'] = partition_info[b'is_partition_empty']
return output
finally:
await mgmt_client.close_async()

def create_receiver(
self, consumer_group, partition, offset=None, epoch=None, operation=None,
prefetch=None, keep_alive=None, auto_reconnect=None, loop=None):
self, partition_id, consumer_group="$Default", event_position=None, exclusive_receiver_priority=None, operation=None,
prefetch=None, loop=None):
"""
Add an async receiver to the client for a particular consumer group and partition.

:param consumer_group: The name of the consumer group.
YijunXieMS marked this conversation as resolved.
Show resolved Hide resolved
:type consumer_group: str
:param partition: The ID of the partition.
:type partition: str
:param offset: The offset from which to start receiving.
:type offset: ~azure.eventhub.common.Offset
:param event_position: The position from which to start receiving.
:type event_position: ~azure.eventhub.common.EventPosition
:param prefetch: The message prefetch count of the receiver. Default is 300.
:type prefetch: int
:operation: An optional operation to be appended to the hostname in the source URL.
Expand All @@ -145,53 +195,18 @@ def create_receiver(
:caption: Add an async receiver to the client for a particular consumer group and partition.

"""
keep_alive = self.config.keep_alive if keep_alive is None else keep_alive
auto_reconnect = self.config.auto_reconnect if auto_reconnect is None else auto_reconnect
prefetch = self.config.prefetch if prefetch is None else prefetch

path = self.address.path + operation if operation else self.address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition)
self.address.hostname, path, consumer_group, partition_id)
handler = Receiver(
self, source_url, offset=offset, epoch=epoch, prefetch=prefetch, keep_alive=keep_alive,
auto_reconnect=auto_reconnect, loop=loop)
self, source_url, offset=event_position, exclusive_receiver_priority=exclusive_receiver_priority,
prefetch=prefetch, loop=loop)
return handler

def create_epoch_receiver(
self, consumer_group, partition, epoch, prefetch=300, operation=None, loop=None):
"""
Add an async receiver to the client with an epoch value. Only a single epoch receiver
can connect to a partition at any given time - additional epoch receivers must have
a higher epoch value or they will be rejected. If a 2nd epoch receiver has
connected, the first will be closed.

:param consumer_group: The name of the consumer group.
:type consumer_group: str
:param partition: The ID of the partition.
:type partition: str
:param epoch: The epoch value for the receiver.
:type epoch: int
:param prefetch: The message prefetch count of the receiver. Default is 300.
:type prefetch: int
:operation: An optional operation to be appended to the hostname in the source URL.
The value must start with `/` character.
:type operation: str
:rtype: ~azure.eventhub.aio.receiver_async.ReceiverAsync

Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
:start-after: [START create_eventhub_client_async_epoch_receiver]
:end-before: [END create_eventhub_client_async_epoch_receiver]
:language: python
:dedent: 4
:caption: Add an async receiver to the client with an epoch value.

"""
return self.create_receiver(consumer_group, partition, epoch=epoch, prefetch=prefetch,
operation=operation, loop=loop)

def create_sender(
self, partition=None, operation=None, send_timeout=None, keep_alive=None, auto_reconnect=None, loop=None):
self, partition_id=None, operation=None, send_timeout=None, loop=None):
"""
Add an async sender to the client to send ~azure.eventhub.common.EventData object
to an EventHub.
Expand Down Expand Up @@ -229,10 +244,7 @@ def create_sender(
if operation:
target = target + operation
send_timeout = self.config.send_timeout if send_timeout is None else send_timeout
keep_alive = self.config.keep_alive if keep_alive is None else keep_alive
auto_reconnect = self.config.auto_reconnect if auto_reconnect is None else auto_reconnect

handler = Sender(
self, target, partition=partition, send_timeout=send_timeout,
keep_alive=keep_alive, auto_reconnect=auto_reconnect, loop=loop)
self, target, partition=partition_id, send_timeout=send_timeout, loop=loop)
return handler
Loading