Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error hierarchy, sample code and docstring #5743

Merged
merged 25 commits into from
Jun 14, 2019
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,30 @@
__version__ = "1.3.1"

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, AuthenticationError
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
AuthenticationError, EventDataSendError, ConnectionLostError
from azure.eventhub.client import EventHubClient
from azure.eventhub.sender import Sender
from azure.eventhub.receiver import Receiver
from azure.eventhub.sender import EventSender
from azure.eventhub.receiver import EventReceiver
from .constants import MessageSendResult
from .constants import TransportType
from .common import FIRST_AVAILABLE, NEW_EVENTS_ONLY, SharedKeyCredentials, SASTokenCredentials
from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential

__all__ = [
"__version__",
"EventData",
"EventHubError",
"ConnectError",
"ConnectionLostError",
"EventDataError",
"EventDataSendError",
"AuthenticationError",
"EventPosition",
"EventHubClient",
"Sender",
"Receiver",
"EventSender",
"EventReceiver",
"MessageSendResult",
"TransportType",
"FIRST_AVAILABLE", "NEW_EVENTS_ONLY",
"SharedKeyCredentials",
"SASTokenCredentials",
"EventHubSharedKeyCredential",
"EventHubSASTokenCredential",
]
8 changes: 4 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from .event_hubs_client_async import EventHubClient
from .receiver_async import Receiver
from .sender_async import Sender
from .receiver_async import EventReceiver
from .sender_async import EventSender

__all__ = [
"EventHubClient",
"Receiver",
"Sender"
"EventReceiver",
"EventSender"
]
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
AMQPClientAsync,
)

from azure.eventhub.common import parse_sas_token, SharedKeyCredentials, SASTokenCredentials
from azure.eventhub.common import parse_sas_token, EventPosition, EventHubSharedKeyCredential, EventHubSASTokenCredential
from azure.eventhub import (
EventHubError)
from ..client_abstract import EventHubClientAbstract

from .sender_async import Sender
from .receiver_async import Receiver
from .sender_async import EventSender
from .receiver_async import EventReceiver


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,7 +56,7 @@ def _create_auth(self, username=None, password=None):
transport_type = self.config.transport_type
auth_timeout = self.config.auth_timeout

if isinstance(self.credential, SharedKeyCredentials):
if isinstance(self.credential, EventHubSharedKeyCredential):
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
Expand All @@ -66,7 +66,7 @@ def _create_auth(self, username=None, password=None):
self.auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy,
transport_type=transport_type)

elif isinstance(self.credential, SASTokenCredentials):
elif isinstance(self.credential, EventHubSASTokenCredential):
token = self.credential.get_sas_token()
try:
expiry = int(parse_sas_token(token)['se'])
Expand All @@ -85,10 +85,14 @@ def _create_auth(self, username=None, password=None):
get_jwt_token, http_proxy=http_proxy,
transport_type=transport_type)


async def get_properties(self):
"""
Get details on the specified EventHub async.
Get properties of the specified EventHub async.
Keys in the details dictionary include:

-'path'
-'created_at'
-'partition_ids'

:rtype: dict
"""
Expand Down Expand Up @@ -117,21 +121,25 @@ async def get_properties(self):
await mgmt_client.close_async()

async def get_partition_ids(self):
"""
Get partition ids of the specified EventHub async.

:rtype: list[str]
"""
return (await self.get_properties())['partition_ids']

async def get_partition_properties(self, partition):
"""
Get information on the specified partition async.
Get properties of the specified partition async.
Keys in the details dictionary include:

-'name'
-'type'
-'partition'
-'begin_sequence_number'
-'event_hub_path'
-'id'
-'beginning_sequence_number'
-'last_enqueued_sequence_number'
-'last_enqueued_offset'
-'last_enqueued_time_utc'
-'is_partition_empty'
-'is_empty'

:param partition: The target partition id.
:type partition: str
Expand Down Expand Up @@ -163,28 +171,33 @@ async def get_partition_properties(self, partition):
output['last_enqueued_time_utc'] = datetime.datetime.utcfromtimestamp(
float(partition_info[b'last_enqueued_time_utc'] / 1000))
output['is_empty'] = partition_info[b'is_partition_empty']
output['retrieval_time'] = datetime.datetime.utcnow()
return output
finally:
await mgmt_client.close_async()

def create_receiver(
self, partition_id, consumer_group="$Default", event_position=None, exclusive_receiver_priority=None, operation=None,
prefetch=None, loop=None):
self, partition_id, consumer_group="$Default", event_position=EventPosition.first_available_event(), exclusive_receiver_priority=None,
operation=None, prefetch=None, loop=None):
"""
Add an async receiver to the client for a particular consumer group and partition.
Create an async receiver to the client for a particular consumer group and partition.

:param consumer_group: The name of the consumer group.
:param partition_id: The ID of the partition.
:type partition_id: str
:param consumer_group: The name of the consumer group. Default value is `$Default`.
:type consumer_group: str
:param partition: The ID of the partition.
:type partition: str
:param event_position: The position from which to start receiving.
:type event_position: ~azure.eventhub.common.EventPosition
:param prefetch: The message prefetch count of the receiver. Default is 300.
:type prefetch: int
:operation: An optional operation to be appended to the hostname in the source URL.
:param exclusive_receiver_priority: The priority of the exclusive receiver. The client will create an exclusive
receiver if exclusive_receiver_priority is set.
:type exclusive_receiver_priority: int
:param operation: An optional operation to be appended to the hostname in the source URL.
The value must start with `/` character.
:type operation: str
:rtype: ~azure.eventhub.aio.receiver_async.ReceiverAsync
:param prefetch: The message prefetch count of the receiver. Default is 300.
:type prefetch: int
:param loop: An event loop. If not specified the default event loop will be used.
:rtype: ~azure.eventhub.aio.receiver_async.EventReceiver

Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
Expand All @@ -200,35 +213,30 @@ def create_receiver(
path = self.address.path + operation if operation else self.address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition_id)
handler = Receiver(
self, source_url, offset=event_position, exclusive_receiver_priority=exclusive_receiver_priority,
handler = EventReceiver(
self, source_url, event_position=event_position, exclusive_receiver_priority=exclusive_receiver_priority,
prefetch=prefetch, loop=loop)
return handler

def create_sender(
self, partition_id=None, operation=None, send_timeout=None, loop=None):
"""
Add an async sender to the client to send ~azure.eventhub.common.EventData object
Create an async sender to the client to send ~azure.eventhub.common.EventData object
to an EventHub.

:param partition: Optionally specify a particular partition to send to.
:param partition_id: Optionally specify a particular partition to send to.
If omitted, the events will be distributed to available partitions via
round-robin.
:type partition: str
:operation: An optional operation to be appended to the hostname in the target URL.
:type partition_id: str
:param operation: An optional operation to be appended to the hostname in the target URL.
The value must start with `/` character.
:type operation: str
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
:type send_timeout: int
:param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
periods of inactivity. The default value is 30 seconds. If set to `None`, the connection will not
be pinged.
:type keep_alive: int
:param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
Default value is `True`.
:type auto_reconnect: bool
:rtype: ~azure.eventhub.aio.sender_async.SenderAsync
:param loop: An event loop. If not specified the default event loop will be used.
:rtype ~azure.eventhub.aio.sender_async.EventSender


Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
Expand All @@ -245,6 +253,6 @@ def create_sender(
target = target + operation
send_timeout = self.config.send_timeout if send_timeout is None else send_timeout

handler = Sender(
handler = EventSender(
self, target, partition=partition_id, send_timeout=send_timeout, loop=loop)
return handler
Loading