Skip to content

Commit

Permalink
Error hierarchy, sample code and docstring (#5743)
Browse files Browse the repository at this point in the history
* Recover from fork repo

* Packaging update of azure-eventhubs

* Fix error message

* update iterator example

* Revert "Packaging update of azure-eventhubs"

This reverts commit 56fc4f0.

* disable autorest auto update

* Sender/Receiver -> EventSender/Receiver

* Change _batching_label back to partition_key

* Remove transfer examples

* move async to async folder

* Update docstring string, sample codes and test codes (#5793)

* catch and process LinkRedirect

* Add receiver iterator pytest

* small fix of iterator example

* add retrieval_time to partition prop

* fix open and re-send bugs

* small fixes

* fix reconnect test case

* close iterator when closing receiver

* Misc changes for code review fix

* client.py type hints

* catch KeyboardInterrupt

* add next() for 2.7 iterator

* raise KeyboardInterrupt instead of exit()
  • Loading branch information
YijunXieMS authored Jun 14, 2019
1 parent 24219aa commit b77d019
Show file tree
Hide file tree
Showing 58 changed files with 1,867 additions and 1,547 deletions.
22 changes: 12 additions & 10 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,33 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "1.3.1"
__version__ = "2.0.0-preview.1"

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, AuthenticationError
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
AuthenticationError, EventDataSendError, ConnectionLostError
from azure.eventhub.client import EventHubClient
from azure.eventhub.sender import Sender
from azure.eventhub.receiver import Receiver
from azure.eventhub.sender import EventSender
from azure.eventhub.receiver import EventReceiver
from .constants import MessageSendResult
from .constants import TransportType
from .common import FIRST_AVAILABLE, NEW_EVENTS_ONLY, SharedKeyCredentials, SASTokenCredentials
from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential

__all__ = [
"__version__",
"EventData",
"EventHubError",
"ConnectError",
"ConnectionLostError",
"EventDataError",
"EventDataSendError",
"AuthenticationError",
"EventPosition",
"EventHubClient",
"Sender",
"Receiver",
"EventSender",
"EventReceiver",
"MessageSendResult",
"TransportType",
"FIRST_AVAILABLE", "NEW_EVENTS_ONLY",
"SharedKeyCredentials",
"SASTokenCredentials",
"EventHubSharedKeyCredential",
"EventHubSASTokenCredential",
]
8 changes: 4 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from .event_hubs_client_async import EventHubClient
from .receiver_async import Receiver
from .sender_async import Sender
from .receiver_async import EventReceiver
from .sender_async import EventSender

__all__ = [
"EventHubClient",
"Receiver",
"Sender"
"EventReceiver",
"EventSender"
]
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
AMQPClientAsync,
)

from azure.eventhub.common import parse_sas_token, SharedKeyCredentials, SASTokenCredentials
from azure.eventhub.common import parse_sas_token, EventPosition, EventHubSharedKeyCredential, EventHubSASTokenCredential
from azure.eventhub import (
EventHubError)
from ..client_abstract import EventHubClientAbstract

from .sender_async import Sender
from .receiver_async import Receiver
from .sender_async import EventSender
from .receiver_async import EventReceiver


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,7 +56,7 @@ def _create_auth(self, username=None, password=None):
transport_type = self.config.transport_type
auth_timeout = self.config.auth_timeout

if isinstance(self.credential, SharedKeyCredentials):
if isinstance(self.credential, EventHubSharedKeyCredential):
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
Expand All @@ -66,7 +66,7 @@ def _create_auth(self, username=None, password=None):
self.auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy,
transport_type=transport_type)

elif isinstance(self.credential, SASTokenCredentials):
elif isinstance(self.credential, EventHubSASTokenCredential):
token = self.credential.get_sas_token()
try:
expiry = int(parse_sas_token(token)['se'])
Expand All @@ -85,10 +85,14 @@ def _create_auth(self, username=None, password=None):
get_jwt_token, http_proxy=http_proxy,
transport_type=transport_type)


async def get_properties(self):
"""
Get details on the specified EventHub async.
Get properties of the specified EventHub async.
Keys in the details dictionary include:
-'path'
-'created_at'
-'partition_ids'
:rtype: dict
"""
Expand Down Expand Up @@ -117,21 +121,25 @@ async def get_properties(self):
await mgmt_client.close_async()

async def get_partition_ids(self):
"""
Get partition ids of the specified EventHub async.
:rtype: list[str]
"""
return (await self.get_properties())['partition_ids']

async def get_partition_properties(self, partition):
"""
Get information on the specified partition async.
Get properties of the specified partition async.
Keys in the details dictionary include:
-'name'
-'type'
-'partition'
-'begin_sequence_number'
-'event_hub_path'
-'id'
-'beginning_sequence_number'
-'last_enqueued_sequence_number'
-'last_enqueued_offset'
-'last_enqueued_time_utc'
-'is_partition_empty'
-'is_empty'
:param partition: The target partition id.
:type partition: str
Expand Down Expand Up @@ -168,23 +176,27 @@ async def get_partition_properties(self, partition):
await mgmt_client.close_async()

def create_receiver(
self, partition_id, consumer_group="$Default", event_position=None, exclusive_receiver_priority=None, operation=None,
prefetch=None, loop=None):
self, partition_id, consumer_group="$Default", event_position=EventPosition.first_available_event(), exclusive_receiver_priority=None,
operation=None, prefetch=None, loop=None):
"""
Add an async receiver to the client for a particular consumer group and partition.
Create an async receiver to the client for a particular consumer group and partition.
:param consumer_group: The name of the consumer group.
:param partition_id: The ID of the partition.
:type partition_id: str
:param consumer_group: The name of the consumer group. Default value is `$Default`.
:type consumer_group: str
:param partition: The ID of the partition.
:type partition: str
:param event_position: The position from which to start receiving.
:type event_position: ~azure.eventhub.common.EventPosition
:param prefetch: The message prefetch count of the receiver. Default is 300.
:type prefetch: int
:operation: An optional operation to be appended to the hostname in the source URL.
:param exclusive_receiver_priority: The priority of the exclusive receiver. The client will create an exclusive
receiver if exclusive_receiver_priority is set.
:type exclusive_receiver_priority: int
:param operation: An optional operation to be appended to the hostname in the source URL.
The value must start with `/` character.
:type operation: str
:rtype: ~azure.eventhub.aio.receiver_async.ReceiverAsync
:param prefetch: The message prefetch count of the receiver. Default is 300.
:type prefetch: int
:param loop: An event loop. If not specified the default event loop will be used.
:rtype: ~azure.eventhub.aio.receiver_async.EventReceiver
Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
Expand All @@ -200,35 +212,30 @@ def create_receiver(
path = self.address.path + operation if operation else self.address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition_id)
handler = Receiver(
self, source_url, offset=event_position, exclusive_receiver_priority=exclusive_receiver_priority,
handler = EventReceiver(
self, source_url, event_position=event_position, exclusive_receiver_priority=exclusive_receiver_priority,
prefetch=prefetch, loop=loop)
return handler

def create_sender(
self, partition_id=None, operation=None, send_timeout=None, loop=None):
"""
Add an async sender to the client to send ~azure.eventhub.common.EventData object
Create an async sender to the client to send ~azure.eventhub.common.EventData object
to an EventHub.
:param partition: Optionally specify a particular partition to send to.
:param partition_id: Optionally specify a particular partition to send to.
If omitted, the events will be distributed to available partitions via
round-robin.
:type partition: str
:operation: An optional operation to be appended to the hostname in the target URL.
:type partition_id: str
:param operation: An optional operation to be appended to the hostname in the target URL.
The value must start with `/` character.
:type operation: str
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
:type send_timeout: int
:param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
periods of inactivity. The default value is 30 seconds. If set to `None`, the connection will not
be pinged.
:type keep_alive: int
:param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
Default value is `True`.
:type auto_reconnect: bool
:rtype: ~azure.eventhub.aio.sender_async.SenderAsync
:type send_timeout: float
:param loop: An event loop. If not specified the default event loop will be used.
:rtype ~azure.eventhub.aio.sender_async.EventSender
Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
Expand All @@ -245,6 +252,6 @@ def create_sender(
target = target + operation
send_timeout = self.config.send_timeout if send_timeout is None else send_timeout

handler = Sender(
handler = EventSender(
self, target, partition=partition_id, send_timeout=send_timeout, loop=loop)
return handler
Loading

0 comments on commit b77d019

Please sign in to comment.